Revert "HBASE-11544 [Ergonomics] hbase.client.scanner.caching is dogged and will try to return batch even if it means OOME" References the wrong JIRA above so revert; also an addendum is on the way.
This reverts commit 26ba621e47e886fb3b1336f2201b8efbda86ff91. Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8cd3001f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8cd3001f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8cd3001f Branch: refs/heads/master Commit: 8cd3001f817915df20a4d209c450ac9b69b915d7 Parents: 26ba621 Author: stack <[email protected]> Authored: Wed Apr 8 09:32:09 2015 -0700 Committer: stack <[email protected]> Committed: Wed Apr 8 09:32:09 2015 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hbase/HTableDescriptor.java | 22 +- .../hadoop/hbase/client/ClientScanner.java | 3 - .../client/ScannerCallableWithReplicas.java | 34 +- .../java/org/apache/hadoop/hbase/CellUtil.java | 1 - .../org/apache/hadoop/hbase/HConstants.java | 19 +- .../coprocessor/example/BulkDeleteEndpoint.java | 3 +- .../coprocessor/example/RowCountEndpoint.java | 5 +- .../hbase/client/ClientSideRegionScanner.java | 8 +- .../coprocessor/AggregateImplementation.java | 15 +- .../hadoop/hbase/regionserver/HRegion.java | 271 +++++----- .../hbase/regionserver/InternalScanner.java | 209 +++++++- .../hadoop/hbase/regionserver/KeyValueHeap.java | 44 +- .../regionserver/NoLimitScannerContext.java | 94 ---- .../hbase/regionserver/RSRpcServices.java | 66 ++- .../hbase/regionserver/RegionScanner.java | 50 +- .../hbase/regionserver/ScannerContext.java | 527 ------------------- .../hadoop/hbase/regionserver/StoreFlusher.java | 7 +- .../hadoop/hbase/regionserver/StoreScanner.java | 78 +-- .../regionserver/compactions/Compactor.java | 7 +- .../security/access/AccessControlLists.java | 3 +- .../hbase/security/access/AccessController.java | 6 +- .../org/apache/hadoop/hbase/HBaseTestCase.java | 3 +- .../hbase/TestPartialResultsFromClientSide.java | 8 +- .../hbase/client/TestIntraRowPagination.java | 3 +- .../hadoop/hbase/client/TestReplicasClient.java | 90 +--- .../coprocessor/ColumnAggregationEndpoint.java | 3 +- .../ColumnAggregationEndpointNullResponse.java | 3 +- .../ColumnAggregationEndpointWithErrors.java | 3 +- .../coprocessor/TestCoprocessorInterface.java | 23 +- .../TestRegionObserverInterface.java | 19 +- .../hbase/filter/TestColumnPrefixFilter.java | 7 +- .../hbase/filter/TestDependentColumnFilter.java | 3 +- .../apache/hadoop/hbase/filter/TestFilter.java | 29 +- .../filter/TestInvocationRecordFilter.java | 5 +- .../filter/TestMultipleColumnPrefixFilter.java | 9 +- .../hbase/io/encoding/TestPrefixTree.java | 11 +- .../TestScannerSelectionUsingKeyRange.java | 5 +- .../io/hfile/TestScannerSelectionUsingTTL.java | 3 +- .../hbase/regionserver/TestAtomicOperation.java | 9 +- .../hbase/regionserver/TestBlocksScanned.java | 8 +- .../hbase/regionserver/TestColumnSeeking.java | 5 +- .../hbase/regionserver/TestDefaultMemStore.java | 9 +- .../regionserver/TestGetClosestAtOrBefore.java | 5 +- .../hadoop/hbase/regionserver/TestHRegion.java | 104 ++-- .../hbase/regionserver/TestKeepDeletes.java | 6 +- .../hbase/regionserver/TestMajorCompaction.java | 9 +- .../regionserver/TestMultiColumnScanner.java | 3 +- .../TestRegionMergeTransaction.java | 3 +- .../regionserver/TestReversibleScanners.java | 3 +- .../regionserver/TestScanWithBloomError.java | 3 +- .../hadoop/hbase/regionserver/TestScanner.java | 11 +- .../regionserver/TestSeekOptimizations.java | 3 +- .../regionserver/TestSplitTransaction.java | 3 +- .../hbase/regionserver/TestStoreScanner.java | 53 +- .../hbase/regionserver/TestStripeCompactor.java | 16 +- .../hbase/regionserver/TestWideScanner.java | 3 +- .../compactions/TestStripeCompactionPolicy.java | 18 +- .../hbase/regionserver/wal/TestWALReplay.java | 3 +- .../apache/hadoop/hbase/util/TestMergeTool.java | 3 +- 59 files changed, 787 insertions(+), 1192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java index 4fae2c7..a0ab484 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -159,9 +159,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { */ @Deprecated public static final String DEFERRED_LOG_FLUSH = "DEFERRED_LOG_FLUSH"; - /** - * @deprecated - */ @Deprecated private static final Bytes DEFERRED_LOG_FLUSH_KEY = new Bytes(Bytes.toBytes(DEFERRED_LOG_FLUSH)); @@ -318,7 +315,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { * Construct a table descriptor specifying a byte array table name * @param name Table name. * @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a> - * @deprecated */ @Deprecated public HTableDescriptor(final byte[] name) { @@ -329,7 +325,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { * Construct a table descriptor specifying a String table name * @param name Table name. * @see <a href="HADOOP-1581">HADOOP-1581 HBASE: Un-openable tablename bug</a> - * @deprecated */ @Deprecated public HTableDescriptor(final String name) { @@ -704,7 +699,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { * Set the name of the table. * * @param name name of table - * @deprecated */ @Deprecated public HTableDescriptor setName(byte[] name) { @@ -712,9 +706,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { return this; } - /** - * @deprecated - */ @Deprecated public HTableDescriptor setName(TableName name) { this.name = name; @@ -1349,7 +1340,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { * @param rootdir qualified path of HBase root directory * @param tableName name of table * @return {@link Path} for table - * @deprecated */ @Deprecated public static Path getTableDir(Path rootdir, final byte [] tableName) { @@ -1363,7 +1353,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { /** Table descriptor for <code>hbase:meta</code> catalog table * Deprecated, use TableDescriptors#get(TableName.META_TABLE) or * Admin#getTableDescriptor(TableName.META_TABLE) instead. - * @deprecated */ @Deprecated public static final HTableDescriptor META_TABLEDESC = new HTableDescriptor( @@ -1423,18 +1412,12 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { .setCacheDataInL1(true) }); - /** - * @deprecated - */ @Deprecated public HTableDescriptor setOwner(User owner) { return setOwnerString(owner != null ? owner.getShortName() : null); } - /** - * used by admin.rb:alter(table_name,*args) to update owner. - * @deprecated - */ + // used by admin.rb:alter(table_name,*args) to update owner. @Deprecated public HTableDescriptor setOwnerString(String ownerString) { if (ownerString != null) { @@ -1445,9 +1428,6 @@ public class HTableDescriptor implements Comparable<HTableDescriptor> { return this; } - /** - * @deprecated - */ @Deprecated public String getOwnerString() { if (getValue(OWNER_KEY) != null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index ccd8c2d..05a780c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -401,9 +401,6 @@ public class ClientScanner extends AbstractClientScanner { // happens for the cases where we see exceptions. Since only openScanner // would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { - // Any accumulated partial results are no longer valid since the callable will - // openScanner with the correct startkey and we must pick up from there - clearPartialResults(); this.currentRegion = callable.getHRegionInfo(); continue; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 7ba152b..ca6ab05 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -292,7 +292,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { continue; //this was already scheduled earlier } ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); - setStartRowForReplicaCallable(s); + + if (this.lastResult != null) { + if(s.getScan().isReversed()){ + s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); + }else { + s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); + } + } outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); cs.submit(retryingOnReplica, scannerTimeout, id); @@ -300,31 +307,6 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { return max - min + 1; } - /** - * Set the start row for the replica callable based on the state of the last result received. - * @param callable The callable to set the start row on - */ - private void setStartRowForReplicaCallable(ScannerCallable callable) { - if (this.lastResult == null || callable == null) return; - - if (this.lastResult.isPartial()) { - // The last result was a partial result which means we have not received all of the cells - // for this row. Thus, use the last result's row as the start row. If a replica switch - // occurs, the scanner will ensure that any accumulated partial results are cleared, - // and the scan can resume from this row. - callable.getScan().setStartRow(this.lastResult.getRow()); - } else { - // The last result was not a partial result which means it contained all of the cells for - // that row (we no longer need any information from it). Set the start row to the next - // closest row that could be seen. - if (callable.getScan().isReversed()) { - callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); - } else { - callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); - } - } - } - @VisibleForTesting boolean isAnyRPCcancelled() { return someRPCcancelled; http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index 2060488..bce3957 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -574,7 +574,6 @@ public final class CellUtil { * backwards compatible with estimations done by older clients. We need to * pretend that tags never exist and cells aren't serialized with tag * length included. See HBASE-13262 and HBASE-13303 - * @deprecated See above comment */ @Deprecated public static long estimatedHeapSizeOfWithoutTags(final Cell cell) { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index b5a6318..19e251a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -391,7 +391,7 @@ public final class HConstants { /** * The hbase:meta table's name. - * @deprecated For 0.94 to 0.96 compatibility. Replaced by define in TableName + * */ @Deprecated // for compat from 0.94 -> 0.96. public static final byte[] META_TABLE_NAME = TableName.META_TABLE_NAME.getName(); @@ -579,7 +579,7 @@ public final class HConstants { * 1, 2, 3, 5, 10, 20, 40, 100, 100, 100. * With 100ms, a back-off of 200 means 20s */ - public static final int [] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200}; + public static final int RETRY_BACKOFF[] = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200}; public static final String REGION_IMPL = "hbase.hregion.impl"; @@ -780,8 +780,7 @@ public final class HConstants { /** * timeout for short operation RPC */ - public static final String HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY = - "hbase.rpc.shortoperation.timeout"; + public static final String HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY = "hbase.rpc.shortoperation.timeout"; /** * Default value of {@link #HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY} @@ -836,8 +835,8 @@ public final class HConstants { */ public static final float HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD = 0.2f; - public static final Pattern CP_HTD_ATTR_KEY_PATTERN = Pattern.compile( - "^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE); + public static final Pattern CP_HTD_ATTR_KEY_PATTERN = Pattern.compile + ("^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE); public static final Pattern CP_HTD_ATTR_VALUE_PATTERN = Pattern.compile("(^[^\\|]*)\\|([^\\|]+)\\|[\\s]*([\\d]*)[\\s]*(\\|.*)?$"); @@ -890,7 +889,7 @@ public final class HConstants { * 1 => Abort only all of the handers have died */ public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = - "hbase.regionserver.handler.abort.on.error.percent"; + "hbase.regionserver.handler.abort.on.error.percent"; public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5; //High priority handlers to deal with admin requests and system table operation requests @@ -950,8 +949,7 @@ public final class HConstants { public static final String DEFAULT_WAL_STORAGE_POLICY = "NONE"; /** Region in Transition metrics threshold time */ - public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD = - "hbase.metrics.rit.stuck.warning.threshold"; + public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold"; public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop"; @@ -1046,8 +1044,7 @@ public final class HConstants { * 0.0.0.0. * @see <a href="https://issues.apache.org/jira/browse/HBASE-9961">HBASE-9961</a> */ - public static final String STATUS_MULTICAST_BIND_ADDRESS = - "hbase.status.multicast.bind.address.ip"; + public static final String STATUS_MULTICAST_BIND_ADDRESS = "hbase.status.multicast.bind.address.ip"; public static final String DEFAULT_STATUS_MULTICAST_BIND_ADDRESS = "0.0.0.0"; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index 93f98ac..e0c3bae 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.Bu import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -136,7 +137,7 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize); for (int i = 0; i < rowBatchSize; i++) { List<Cell> results = new ArrayList<Cell>(); - hasMore = scanner.next(results); + hasMore = NextState.hasMoreValues(scanner.next(results)); if (results.size() > 0) { deleteRows.add(results); } http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java index 4309cdc..2afd05e 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.RpcCallback; @@ -80,7 +81,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService byte[] lastRow = null; long count = 0; do { - hasMore = scanner.next(results); + hasMore = NextState.hasMoreValues(scanner.next(results)); for (Cell kv : results) { byte[] currentRow = CellUtil.cloneRow(kv); if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { @@ -119,7 +120,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService boolean hasMore = false; long count = 0; do { - hasMore = scanner.next(results); + hasMore = NextState.hasMoreValues(scanner.next(results)); for (Cell kv : results) { count++; } http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index 04b88e9..a80a07e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -29,10 +30,8 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.mortbay.log.Log; @@ -73,7 +72,10 @@ public class ClientSideRegionScanner extends AbstractClientScanner { public Result next() throws IOException { values.clear(); - scanner.nextRaw(values, NoLimitScannerContext.NO_LIMIT); + // negative values indicate no limits + final long remainingResultSize = -1; + final int batchLimit = -1; + scanner.nextRaw(values, batchLimit, remainingResultSize); if (values.isEmpty()) { //we are done return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java index 81c933b..b6f834e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateReque import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -91,7 +92,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { // qualifier can be null. boolean hasMoreRows = false; do { - hasMoreRows = scanner.next(results); + hasMoreRows = NextState.hasMoreValues(scanner.next(results)); int listSize = results.size(); for (int i = 0; i < listSize; i++) { temp = ci.getValue(colFamily, qualifier, results.get(i)); @@ -145,7 +146,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { } boolean hasMoreRows = false; do { - hasMoreRows = scanner.next(results); + hasMoreRows = NextState.hasMoreValues(scanner.next(results)); int listSize = results.size(); for (int i = 0; i < listSize; i++) { temp = ci.getValue(colFamily, qualifier, results.get(i)); @@ -199,7 +200,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { List<Cell> results = new ArrayList<Cell>(); boolean hasMoreRows = false; do { - hasMoreRows = scanner.next(results); + hasMoreRows = NextState.hasMoreValues(scanner.next(results)); int listSize = results.size(); for (int i = 0; i < listSize; i++) { temp = ci.getValue(colFamily, qualifier, results.get(i)); @@ -253,7 +254,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { scanner = env.getRegion().getScanner(scan); boolean hasMoreRows = false; do { - hasMoreRows = scanner.next(results); + hasMoreRows = NextState.hasMoreValues(scanner.next(results)); if (results.size() > 0) { counter++; } @@ -312,7 +313,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { do { results.clear(); - hasMoreRows = scanner.next(results); + hasMoreRows = NextState.hasMoreValues(scanner.next(results)); int listSize = results.size(); for (int i = 0; i < listSize; i++) { sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily, @@ -373,7 +374,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { do { tempVal = null; - hasMoreRows = scanner.next(results); + hasMoreRows = NextState.hasMoreValues(scanner.next(results)); int listSize = results.size(); for (int i = 0; i < listSize; i++) { tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, @@ -440,7 +441,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { do { tempVal = null; tempWeight = null; - hasMoreRows = scanner.next(results); + hasMoreRows = NextState.hasMoreValues(scanner.next(results)); int listSize = results.size(); for (int i = 0; i < listSize; i++) { Cell kv = results.get(i); http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index e082698..4a8e7cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -141,9 +141,8 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; +import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; -import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; -import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; @@ -5176,7 +5175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected Cell joinedContinuationRow = null; protected final byte[] stopRow; private final FilterWrapper filter; - private ScannerContext defaultScannerContext; + private int batch; protected int isScan; private boolean filterClosed = false; private long readPt; @@ -5199,13 +5198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.filter = null; } - /** - * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default - * scanner context that can be used to enforce the batch limit in the event that a - * ScannerContext is not specified during an invocation of next/nextRaw - */ - defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); - + this.batch = scan.getBatch(); if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; } else { @@ -5266,7 +5259,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public int getBatch() { - return this.defaultScannerContext.getBatchLimit(); + return this.batch; } /** @@ -5281,14 +5274,19 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public boolean next(List<Cell> outResults) + public NextState next(List<Cell> outResults) throws IOException { // apply the batching limit by default - return next(outResults, defaultScannerContext); + return next(outResults, batch); + } + + @Override + public NextState next(List<Cell> outResults, int limit) throws IOException { + return next(outResults, limit, -1); } @Override - public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) + public synchronized NextState next(List<Cell> outResults, int limit, long remainingResultSize) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + @@ -5298,107 +5296,122 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(Operation.SCAN); readRequestsCount.increment(); try { - return nextRaw(outResults, scannerContext); + return nextRaw(outResults, limit, remainingResultSize); } finally { closeRegionOperation(Operation.SCAN); } } @Override - public boolean nextRaw(List<Cell> outResults) throws IOException { - // Use the RegionScanner's context by default - return nextRaw(outResults, defaultScannerContext); + public NextState nextRaw(List<Cell> outResults) throws IOException { + return nextRaw(outResults, batch); } @Override - public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) + public NextState nextRaw(List<Cell> outResults, int limit) + throws IOException { + return nextRaw(outResults, limit, -1); + } + + @Override + public NextState nextRaw(List<Cell> outResults, int batchLimit, long remainingResultSize) throws IOException { if (storeHeap == null) { // scanner is closed throw new UnknownScannerException("Scanner was closed"); } - boolean moreValues; + NextState state; if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. - moreValues = nextInternal(outResults, scannerContext); + state = nextInternal(outResults, batchLimit, remainingResultSize); } else { List<Cell> tmpList = new ArrayList<Cell>(); - moreValues = nextInternal(tmpList, scannerContext); + state = nextInternal(tmpList, batchLimit, remainingResultSize); outResults.addAll(tmpList); } + // Invalid states should never be returned. Receiving an invalid state means that we have + // no clue how to proceed. Throw an exception. + if (!NextState.isValidState(state)) { + throw new IOException("Invalid state returned from nextInternal. state:" + state); + } // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows - if (!scannerContext.partialResultFormed()) resetFilters(); + if (!state.sizeLimitReached()) resetFilters(); if (isFilterDoneInternal()) { - moreValues = false; + state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); } - return moreValues; + return state; } /** - * @return true if more cells exist after this batch, false if scanner is done + * @return the state the joinedHeap returned on the call to + * {@link KeyValueHeap#next(List, int, long)} */ - private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext) + private NextState populateFromJoinedHeap(List<Cell> results, int limit, long resultSize) throws IOException { assert joinedContinuationRow != null; - boolean moreValues = - populateResult(results, this.joinedHeap, scannerContext, + NextState state = + populateResult(results, this.joinedHeap, limit, resultSize, joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength()); - - if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { + if (state != null && !state.batchLimitReached() && !state.sizeLimitReached()) { // We are done with this row, reset the continuation. joinedContinuationRow = null; } // As the data is obtained from two independent heaps, we need to // ensure that result list is sorted, because Result relies on that. Collections.sort(results, comparator); - return moreValues; + return state; } /** * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is * reached, or remainingResultSize (if not -1) is reaced * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. - * @param scannerContext + * @param remainingResultSize The remaining space within our result size limit. A negative value + * indicate no limit + * @param batchLimit Max amount of KVs to place in result list, -1 means no limit. * @param currentRow Byte array with key we are fetching. * @param offset offset for currentRow * @param length length for currentRow * @return state of last call to {@link KeyValueHeap#next()} */ - private boolean populateResult(List<Cell> results, KeyValueHeap heap, - ScannerContext scannerContext, byte[] currentRow, int offset, short length) - throws IOException { + private NextState populateResult(List<Cell> results, KeyValueHeap heap, int batchLimit, + long remainingResultSize, byte[] currentRow, int offset, short length) throws IOException { Cell nextKv; boolean moreCellsInRow = false; - boolean tmpKeepProgress = scannerContext.getKeepProgress(); - // Scanning between column families and thus the scope is between cells - LimitScope limitScope = LimitScope.BETWEEN_CELLS; + long accumulatedResultSize = 0; + List<Cell> tmpResults = new ArrayList<Cell>(); do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); + int remainingBatchLimit = batchLimit - results.size(); + NextState heapState = + heap.next(tmpResults, remainingBatchLimit, remainingResultSize - accumulatedResultSize); + results.addAll(tmpResults); + accumulatedResultSize += calculateResultSize(tmpResults, heapState); + tmpResults.clear(); + + if (batchLimit > 0 && results.size() == batchLimit) { + return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, accumulatedResultSize); + } nextKv = heap.peek(); moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - - if (scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); + boolean sizeLimitReached = + remainingResultSize > 0 && accumulatedResultSize >= remainingResultSize; + if (moreCellsInRow && sizeLimitReached) { + return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize); } } while (moreCellsInRow); - return nextKv != null; + if (nextKv != null) { + return NextState.makeState(NextState.State.MORE_VALUES, accumulatedResultSize); + } else { + return NextState.makeState(NextState.State.NO_MORE_VALUES, accumulatedResultSize); + } } /** @@ -5416,6 +5429,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length); } + /** + * Calculates the size of the results. If the state of the scanner that these results came from + * indicates that an estimate of the result size has already been generated, we can skip the + * calculation and use that instead. + * @param results List of cells we want to calculate size of + * @param state The state returned from the scanner that generated these results + * @return aggregate size of results + */ + private long calculateResultSize(List<Cell> results, NextState state) { + if (results == null || results.isEmpty()) return 0; + + // In general, the state should contain the estimate because the result size used to + // determine when the scan has exceeded its size limit. If the estimate is contained in the + // state then we can avoid an unnecesasry calculation. + if (state != null && state.hasResultSizeEstimate()) return state.getResultSize(); + + long size = 0; + for (Cell c : results) { + size += CellUtil.estimatedHeapSizeOfWithoutTags(c); + } + + return size; + } + /* * @return True if a filter rules the scanner is over, done. */ @@ -5428,37 +5465,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.filter != null && this.filter.filterAllRemaining(); } - private boolean nextInternal(List<Cell> results, ScannerContext scannerContext) + private NextState nextInternal(List<Cell> results, int batchLimit, long remainingResultSize) throws IOException { if (!results.isEmpty()) { throw new IllegalArgumentException("First parameter should be an empty list"); } - if (scannerContext == null) { - throw new IllegalArgumentException("Scanner context cannot be null"); - } + // Estimate of the size (heap size) of the results returned from this method + long resultSize = 0; RpcCallContext rpcCall = RpcServer.getCurrentCall(); - - // Save the initial progress from the Scanner context in these local variables. The progress - // may need to be reset a few times if rows are being filtered out so we save the initial - // progress. - int initialBatchProgress = scannerContext.getBatchProgress(); - long initialSizeProgress = scannerContext.getSizeProgress(); - // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. // Then we loop and try again. Otherwise, we must get out on the first iteration via return, // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). while (true) { - // Starting to scan a new row. Reset the scanner progress according to whether or not - // progress should be kept. - if (scannerContext.getKeepProgress()) { - // Progress should be kept. Reset to initial values seen at start of method invocation. - scannerContext.setProgress(initialBatchProgress, initialSizeProgress); - } else { - scannerContext.clearProgress(); - } - if (rpcCall != null) { // If a user specifies a too-restrictive or too-slow scanner, the // client might time out and disconnect while the server side @@ -5486,24 +5506,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } boolean stopRow = isStopRow(currentRow, offset, length); - // When has filter row is true it means that the all the cells for a particular row must be - // read before a filtering decision can be made. This means that filters where hasFilterRow - // run the risk of encountering out of memory errors in the case that they are applied to a - // table that has very large rows. boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); // If filter#hasFilterRow is true, partial results are not allowed since allowing them // would prevent the filters from being evaluated. Thus, if it is true, change the - // scope of any limits that could potentially create partial results to - // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row - if (hasFilterRow) { + // remainingResultSize to -1 so that the entire row's worth of cells are fetched. + if (hasFilterRow && remainingResultSize > 0) { + remainingResultSize = -1; if (LOG.isTraceEnabled()) { - LOG.trace("filter#hasFilterRow is true which prevents partial results from being " - + " formed. Changing scope of limits that may create partials"); + LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + + " formed. The remainingResultSize of: " + remainingResultSize + " will not " + + " be considered when fetching the cells for this row."); } - scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); } + NextState joinedHeapState; // Check if we were getting data from the joinedHeap and hit the limit. // If not, then it's main path - getting results from storeHeap. if (joinedContinuationRow == null) { @@ -5512,30 +5529,47 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (hasFilterRow) { filter.filterRowCells(results); } - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); } // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(currentRow, offset, length)) { boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); results.clear(); continue; } - // Ok, we are good, let's try to get some results from the main heap. - populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length); + NextState storeHeapState = + populateResult(results, this.storeHeap, batchLimit, remainingResultSize, currentRow, + offset, length); + resultSize += calculateResultSize(results, storeHeapState); + // Invalid states should never be returned. If one is seen, throw exception + // since we have no way of telling how we should proceed + if (!NextState.isValidState(storeHeapState)) { + throw new IOException("NextState returned from call storeHeap was invalid"); + } - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { + // Ok, we are good, let's try to get some results from the main heap. + if (storeHeapState.batchLimitReached()) { if (hasFilterRow) { throw new IncompatibleFilterException( - "Filter whose hasFilterRow() returns true is incompatible with scans that must " - + " stop mid-row because of a limit. ScannerContext:" + scannerContext); + "Filter whose hasFilterRow() returns true is incompatible with scan with limit!"); } - return true; + // We hit the batch limit. + return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, resultSize); + } else if (storeHeapState.sizeLimitReached()) { + if (hasFilterRow) { + // We try to guard against this case above when remainingResultSize is set to -1 if + // hasFilterRow is true. In the even that the guard doesn't work, an exception must be + // thrown + throw new IncompatibleFilterException( + "Filter whose hasFilterRows() returns true is incompatible with scans that" + + " return partial results"); + } + // We hit the size limit. + return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); } Cell nextKv = this.storeHeap.peek(); stopRow = nextKv == null || @@ -5548,31 +5582,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (hasFilterRow) { ret = filter.filterRowCellsWithRet(results); - - // We don't know how the results have changed after being filtered. Must set progress - // according to contents of results now. - if (scannerContext.getKeepProgress()) { - scannerContext.setProgress(initialBatchProgress, initialSizeProgress); - } else { - scannerContext.clearProgress(); - } - scannerContext.incrementBatchProgress(results.size()); - for (Cell cell : results) { - scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); - } } if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { results.clear(); boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!stopRow) continue; - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); } // Ok, we are done with storeHeap for this row. @@ -5590,24 +5610,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi currentRow, offset, length)); if (mayHaveData) { joinedContinuationRow = current; - populateFromJoinedHeap(results, scannerContext); - - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { - return true; + joinedHeapState = + populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); + resultSize += + joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? + joinedHeapState.getResultSize() : 0; + if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { + return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); } } } } else { // Populating from the joined heap was stopped by limits, populate some more. - populateFromJoinedHeap(results, scannerContext); - if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { - return true; + joinedHeapState = + populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); + resultSize += + joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? + joinedHeapState.getResultSize() : 0; + if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { + return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); } } // We may have just called populateFromJoinedMap and hit the limits. If that is // the case, we need to call it again on the next next() invocation. if (joinedContinuationRow != null) { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + return NextState.makeState(NextState.State.MORE_VALUES, resultSize); } // Finally, we are done with both joinedHeap and storeHeap. @@ -5615,17 +5642,15 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); - } + if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); if (!stopRow) continue; } // We are done. Return the result. if (stopRow) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); } else { - return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); + return NextState.makeState(NextState.State.MORE_VALUES, resultSize); } } } @@ -7244,7 +7269,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean done; do { kvs.clear(); - done = scanner.next(kvs); + done = NextState.hasMoreValues(scanner.next(kvs)); if (kvs.size() > 0) LOG.info(kvs); } while (done); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index f73e363..ea5a75f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -42,21 +42,218 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface InternalScanner extends Closeable { /** + * This class encapsulates all the meaningful state information that we would like the know about + * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction on + * the possible states is implied through the exposed {@link #makeState(State)} method. + */ + public static class NextState { + /** + * The possible states we want to restrict ourselves to. This enum is not sufficient to + * encapsulate all of the state information since some of the fields of the state must be + * dynamic (e.g. resultSize). + */ + public enum State { + MORE_VALUES(true), + NO_MORE_VALUES(false), + SIZE_LIMIT_REACHED(true), + BATCH_LIMIT_REACHED(true); + + private boolean moreValues; + + private State(final boolean moreValues) { + this.moreValues = moreValues; + } + + /** + * @return true when the state indicates that more values may follow those that have been + * returned + */ + public boolean hasMoreValues() { + return this.moreValues; + } + } + + /** + * state variables + */ + private final State state; + private long resultSize; + + /** + * Value to use for resultSize when the size has not been calculated. Must be a negative number + * so that {@link NextState#hasResultSizeEstimate()} returns false. + */ + private static final long DEFAULT_RESULT_SIZE = -1; + + private NextState(State state, long resultSize) { + this.state = state; + this.resultSize = resultSize; + } + + /** + * @param state + * @return An instance of {@link NextState} where the size of the results returned from the call + * to {@link InternalScanner#next(List)} is unknown. It it the responsibility of the + * caller of {@link InternalScanner#next(List)} to calculate the result size if needed + */ + public static NextState makeState(final State state) { + return makeState(state, DEFAULT_RESULT_SIZE); + } + + /** + * @param state + * @param resultSize + * @return An instance of {@link NextState} where the size of the values returned from the call + * to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating + * the result size by using the cached value retrievable via {@link #getResultSize()} + */ + public static NextState makeState(final State state, long resultSize) { + switch (state) { + case MORE_VALUES: + return createMoreValuesState(resultSize); + case NO_MORE_VALUES: + return createNoMoreValuesState(resultSize); + case BATCH_LIMIT_REACHED: + return createBatchLimitReachedState(resultSize); + case SIZE_LIMIT_REACHED: + return createSizeLimitReachedState(resultSize); + default: + // If the state is not recognized, default to no more value state + return createNoMoreValuesState(resultSize); + } + } + + /** + * Convenience method for creating a state that indicates that more values can be scanned + * @param resultSize estimate of the size (heap size) of the values returned from the call to + * {@link InternalScanner#next(List)} + */ + private static NextState createMoreValuesState(long resultSize) { + return new NextState(State.MORE_VALUES, resultSize); + } + + /** + * Convenience method for creating a state that indicates that no more values can be scanned. + * @param resultSize estimate of the size (heap size) of the values returned from the call to + * {@link InternalScanner#next(List)} + */ + private static NextState createNoMoreValuesState(long resultSize) { + return new NextState(State.NO_MORE_VALUES, resultSize); + } + + /** + * Convenience method for creating a state that indicates that the scan stopped because the + * batch limit was exceeded + * @param resultSize estimate of the size (heap size) of the values returned from the call to + * {@link InternalScanner#next(List)} + */ + private static NextState createBatchLimitReachedState(long resultSize) { + return new NextState(State.BATCH_LIMIT_REACHED, resultSize); + } + + /** + * Convenience method for creating a state that indicates that the scan stopped due to the size + * limit + * @param resultSize estimate of the size (heap size) of the values returned from the call to + * {@link InternalScanner#next(List)} + */ + private static NextState createSizeLimitReachedState(long resultSize) { + return new NextState(State.SIZE_LIMIT_REACHED, resultSize); + } + + /** + * @return true when the scanner has more values to be scanned following the values returned by + * the call to {@link InternalScanner#next(List)} + */ + public boolean hasMoreValues() { + return this.state.hasMoreValues(); + } + + /** + * @return true when the scanner had to stop scanning because it reached the batch limit + */ + public boolean batchLimitReached() { + return this.state == State.BATCH_LIMIT_REACHED; + } + + /** + * @return true when the scanner had to stop scanning because it reached the size limit + */ + public boolean sizeLimitReached() { + return this.state == State.SIZE_LIMIT_REACHED; + } + + /** + * @return The size (heap size) of the values that were returned from the call to + * {@link InternalScanner#next(List)}. This value should only be used if + * {@link #hasResultSizeEstimate()} returns true. + */ + public long getResultSize() { + return resultSize; + } + + /** + * @return true when an estimate for the size of the values returned by + * {@link InternalScanner#next(List)} was provided. If false, it is the responsibility + * of the caller to calculate the result size + */ + public boolean hasResultSizeEstimate() { + return resultSize >= 0; + } + + @Override + public String toString() { + return "State: " + state + " resultSize: " + resultSize; + } + + /** + * Helper method to centralize all checks as to whether or not the state is valid. + * @param state + * @return true when the state is valid + */ + public static boolean isValidState(NextState state) { + return state != null; + } + + /** + * @param state + * @return true when the state is non null and indicates that more values exist + */ + public static boolean hasMoreValues(NextState state) { + return state != null && state.hasMoreValues(); + } + } + + /** * Grab the next row's worth of values. * @param results return output array - * @return true if more rows exist after this one, false if scanner is done + * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this + * one, false if scanner is done * @throws IOException e */ - boolean next(List<Cell> results) throws IOException; + NextState next(List<Cell> results) throws IOException; /** - * Grab the next row's worth of values. + * Grab the next row's worth of values with a limit on the number of values to return. + * @param result return output array + * @param limit limit on row count to get + * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this + * one, false if scanner is done + * @throws IOException e + */ + NextState next(List<Cell> result, int limit) throws IOException; + + /** + * Grab the next row's worth of values with a limit on the number of values to return as well as a + * restriction on the size of the list of values that are returned. * @param result return output array - * @param scannerContext - * @return true if more rows exist after this one, false if scanner is done + * @param limit limit on row count to get + * @param remainingResultSize limit on the size of the result being returned + * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this + * one, false if scanner is done * @throws IOException e */ - boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException; + NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException; /** * Closes the scanner and releases any resources it has allocated http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 6e7788a..beb23cf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -27,7 +27,6 @@ import java.util.PriorityQueue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; /** * Implements a heap merge across any number of KeyValueScanners. @@ -129,20 +128,26 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * This can ONLY be called when you are using Scanners that implement InternalScanner as well as * KeyValueScanner (a {@link StoreScanner}). * @param result - * @return true if more rows exist after this one, false if scanner is done + * @param limit + * @return state where NextState#hasMoreValues() is true if more keys exist after this + * one, false if scanner is done */ - @Override - public boolean next(List<Cell> result) throws IOException { - return next(result, NoLimitScannerContext.NO_LIMIT); + public NextState next(List<Cell> result, int limit) throws IOException { + return next(result, limit, -1); } - @Override - public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + public NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException { if (this.current == null) { - return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + return NextState.makeState(NextState.State.NO_MORE_VALUES); } InternalScanner currentAsInternal = (InternalScanner)this.current; - boolean moreCells = currentAsInternal.next(result, scannerContext); + NextState state = currentAsInternal.next(result, limit, remainingResultSize); + // Invalid states should never be returned. Receiving an invalid state means that we have + // no clue how to proceed. Throw an exception. + if (!NextState.isValidState(state)) { + throw new IOException("Invalid state returned from InternalScanner#next"); + } + boolean mayContainMoreRows = NextState.hasMoreValues(state); Cell pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -151,16 +156,31 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !moreCells) { + if (pee == null || !mayContainMoreRows) { this.current.close(); } else { this.heap.add(this.current); } this.current = pollRealKV(); if (this.current == null) { - moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); } - return moreCells; + return state; + } + + /** + * Gets the next row of keys from the top-most scanner. + * <p> + * This method takes care of updating the heap. + * <p> + * This can ONLY be called when you are using Scanners that implement InternalScanner as well as + * KeyValueScanner (a {@link StoreScanner}). + * @param result + * @return state where NextState#hasMoreValues() is true if more keys exist after this + * one, false if scanner is done + */ + public NextState next(List<Cell> result) throws IOException { + return next(result, -1); } protected static class KVScannerComparator implements Comparator<KeyValueScanner> { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java deleted file mode 100644 index 99a371a..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java +++ /dev/null @@ -1,94 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; - -/** - * This is a special {@link ScannerContext} subclass that is designed to be used globally when - * limits should not be enforced during invocations of {@link InternalScanner#next(java.util.List)} - * or {@link RegionScanner#next(java.util.List)}. - * <p> - * Instances of {@link NoLimitScannerContext} are immutable after construction. Any attempt to - * change the limits or progress of a {@link NoLimitScannerContext} will fail silently. The net - * effect is that all limit checks will return false, thus indicating that a limit has not been - * reached. - */ [email protected](HBaseInterfaceAudience.COPROC) [email protected] -public class NoLimitScannerContext extends ScannerContext { - - public NoLimitScannerContext() { - super(false, null); - } - - /** - * Use this instance whenever limits do not need to be enforced. - */ - public static ScannerContext NO_LIMIT = new NoLimitScannerContext(); - - @Override - void setKeepProgress(boolean keepProgress) { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - } - - @Override - void setBatchProgress(int batchProgress) { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - } - - @Override - void setSizeProgress(long sizeProgress) { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - } - - @Override - void setProgress(int batchProgress, long sizeProgress) { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - } - - @Override - void setSizeLimitScope(LimitScope scope) { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - } - - @Override - NextState setScannerState(NextState state) { - // Do nothing. NoLimitScannerContext instances are immutable post-construction - return state; - } - - @Override - boolean checkBatchLimit(LimitScope checkerScope) { - // No limits can be specified, thus return false to indicate no limit has been reached. - return false; - } - - @Override - boolean checkSizeLimit(LimitScope checkerScope) { - // No limits can be specified, thus return false to indicate no limit has been reached. - return false; - } - - @Override - boolean checkAnyLimitReached(LimitScope checkerScope) { - return false; - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 10e39a1..1508a15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -105,6 +105,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest; @@ -118,8 +120,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfiguratio import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; @@ -151,10 +151,10 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; +import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.Region.Operation; -import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.regionserver.wal.WALEdit; @@ -2236,53 +2236,61 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // correct ordering of partial results and so we prevent partial results from being // formed. boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; - boolean allowPartialResults = + boolean enforceMaxResultSizeAtCellLevel = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; - boolean moreRows = false; - - final LimitScope sizeScope = - allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; - - // Configure with limits for this RPC. Set keep progress true since size progress - // towards size limit should be kept between calls to nextRaw - ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); - contextBuilder.setSizeLimit(sizeScope, maxResultSize); - contextBuilder.setBatchLimit(scanner.getBatch()); - ScannerContext scannerContext = contextBuilder.build(); + NextState state = null; while (i < rows) { // Stop collecting results if we have exceeded maxResultSize - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) { + if (currentScanResultSize >= maxResultSize) { builder.setMoreResultsInRegion(true); break; } - // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The - // batch limit is a limit on the number of cells per Result. Thus, if progress is - // being tracked (i.e. scannerContext.keepProgress() is true) then we need to - // reset the batch progress between nextRaw invocations since we don't want the - // batch progress from previous calls to affect future calls - scannerContext.setBatchProgress(0); + // A negative remainingResultSize communicates that there is no limit on the size + // of the results. + final long remainingResultSize = + enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize + : -1; // Collect values to be returned here - moreRows = scanner.nextRaw(values, scannerContext); - + state = scanner.nextRaw(values, scanner.getBatch(), remainingResultSize); + // Invalid states should never be returned. If one is seen, throw exception + // to stop the scan -- We have no way of telling how we should proceed + if (!NextState.isValidState(state)) { + throw new IOException("NextState returned from call to nextRaw was invalid"); + } if (!values.isEmpty()) { + // The state should always contain an estimate of the result size because that + // estimate must be used to decide when partial results are formed. + boolean skipResultSizeCalculation = state.hasResultSizeEstimate(); + if (skipResultSizeCalculation) currentScanResultSize += state.getResultSize(); + for (Cell cell : values) { totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); + + // If the calculation can't be skipped, then do it now. + if (!skipResultSizeCalculation) { + currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell); + } } - final boolean partial = scannerContext.partialResultFormed(); + // The size limit was reached. This means there are more cells remaining in + // the row but we had to stop because we exceeded our max result size. This + // indicates that we are returning a partial result + final boolean partial = state != null && state.sizeLimitReached(); results.add(Result.create(values, null, stale, partial)); i++; } - if (!moreRows) { + if (!NextState.hasMoreValues(state)) { break; } values.clear(); } - - if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows || - moreRows) { + // currentScanResultSize >= maxResultSize should be functionally equivalent to + // state.sizeLimitReached() + if (null != state + && (currentScanResultSize >= maxResultSize || i >= rows || state + .hasMoreValues())) { // We stopped prematurely builder.setMoreResultsInRegion(true); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/8cd3001f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 66e087b..26f9aef 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; * RegionScanner describes iterators over rows in an HRegion. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) [email protected] [email protected] public interface RegionScanner extends InternalScanner { /** * @return The RegionInfo for this scanner. @@ -74,22 +74,35 @@ public interface RegionScanner extends InternalScanner { int getBatch(); /** - * Grab the next row's worth of values. This is a special internal method to be called from - * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and - * close a region operation, an synchronize on the scanner object. Caller should maintain and - * update metrics. See {@link #nextRaw(List, ScannerContext)} + * Grab the next row's worth of values with the default limit on the number of values to return. + * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. + * Caller must set the thread's readpoint, start and close a region operation, an synchronize on + * the scanner object. Caller should maintain and update metrics. See + * {@link #nextRaw(List, int, long)} * @param result return output array - * @return true if more rows exist after this one, false if scanner is done + * @return a state where NextState#hasMoreValues() is true when more rows exist, false when + * scanner is done. * @throws IOException e */ - boolean nextRaw(List<Cell> result) throws IOException; - + NextState nextRaw(List<Cell> result) throws IOException; + + /** + * Grab the next row's worth of values with the default limit on the number of values to return. + * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. + * Caller must set the thread's readpoint, start and close a region operation, an synchronize on + * the scanner object. Caller should maintain and update metrics. See + * {@link #nextRaw(List, int, long)} + * @param result return output array + * @param limit limit on row count to get + * @return a state where NextState#hasMoreValues() is true when more rows exist, false when + * scanner is done. + * @throws IOException e + */ + NextState nextRaw(List<Cell> result, int limit) throws IOException; + /** - * Grab the next row's worth of values. The {@link ScannerContext} is used to enforce and track - * any limits associated with this call. Any progress that exists in the {@link ScannerContext} - * prior to calling this method will be LOST if {@link ScannerContext#getKeepProgress()} is false. - * Upon returning from this method, the {@link ScannerContext} will contain information about the - * progress made towards the limits. This is a special internal method to be called from + * Grab the next row's worth of values with a limit on the number of values to return as well as a + * limit on the heap size of those values. This is a special internal method to be called from * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and * close a region operation, an synchronize on the scanner object. Example: <code><pre> * HRegion region = ...; @@ -107,12 +120,13 @@ public interface RegionScanner extends InternalScanner { * } * </pre></code> * @param result return output array - * @param scannerContext The {@link ScannerContext} instance encapsulating all limits that should - * be tracked during calls to this method. The progress towards these limits can be - * tracked within this instance. - * @return true if more rows exist after this one, false if scanner is done + * @param limit limit on row count to get + * @param remainingResultSize the space remaining within the restriction on the result size. + * Negative values indicate no limit + * @return a state where NextState#hasMoreValues() is true when more rows exist, false when + * scanner is done. * @throws IOException e */ - boolean nextRaw(List<Cell> result, ScannerContext scannerContext) + NextState nextRaw(List<Cell> result, int limit, final long remainingResultSize) throws IOException; }
