HBASE-13421 Reduce the number of object creations introduced by HBASE-11544 in scan RPC hot code paths
Signed-off-by: stack <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/408b9161 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/408b9161 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/408b9161 Branch: refs/heads/branch-1 Commit: 408b9161754966af80be5046fea657769b24f6a0 Parents: 7469426 Author: Jonathan Lawlor <[email protected]> Authored: Tue Mar 24 15:52:46 2015 -0700 Committer: stack <[email protected]> Committed: Wed Apr 8 14:03:38 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/ClientScanner.java | 3 + .../client/ScannerCallableWithReplicas.java | 34 +- .../coprocessor/example/BulkDeleteEndpoint.java | 3 +- .../coprocessor/example/RowCountEndpoint.java | 5 +- .../hbase/client/ClientSideRegionScanner.java | 8 +- .../coprocessor/AggregateImplementation.java | 15 +- .../hadoop/hbase/regionserver/HRegion.java | 271 +++++----- .../hbase/regionserver/InternalScanner.java | 209 +------- .../hadoop/hbase/regionserver/KeyValueHeap.java | 44 +- .../regionserver/NoLimitScannerContext.java | 102 ++++ .../hbase/regionserver/RSRpcServices.java | 66 +-- .../hbase/regionserver/RegionScanner.java | 56 +- .../hbase/regionserver/ScannerContext.java | 527 +++++++++++++++++++ .../hadoop/hbase/regionserver/StoreFlusher.java | 7 +- .../hadoop/hbase/regionserver/StoreScanner.java | 78 ++- .../regionserver/compactions/Compactor.java | 7 +- .../security/access/AccessControlLists.java | 3 +- .../hbase/security/access/AccessController.java | 6 +- .../org/apache/hadoop/hbase/HBaseTestCase.java | 3 +- .../hbase/TestPartialResultsFromClientSide.java | 8 +- .../hbase/client/TestIntraRowPagination.java | 3 +- .../hadoop/hbase/client/TestReplicasClient.java | 120 +++-- .../coprocessor/ColumnAggregationEndpoint.java | 3 +- .../ColumnAggregationEndpointNullResponse.java | 3 +- .../ColumnAggregationEndpointWithErrors.java | 3 +- .../coprocessor/TestCoprocessorInterface.java | 23 +- .../TestRegionObserverInterface.java | 19 +- .../hbase/filter/TestColumnPrefixFilter.java | 7 +- .../hbase/filter/TestDependentColumnFilter.java | 3 +- .../apache/hadoop/hbase/filter/TestFilter.java | 29 +- .../filter/TestInvocationRecordFilter.java | 3 +- .../filter/TestMultipleColumnPrefixFilter.java | 9 +- .../hbase/io/encoding/TestPrefixTree.java | 11 +- .../TestScannerSelectionUsingKeyRange.java | 3 +- .../io/hfile/TestScannerSelectionUsingTTL.java | 3 +- .../hbase/regionserver/TestAtomicOperation.java | 7 +- .../hbase/regionserver/TestBlocksScanned.java | 4 +- .../hbase/regionserver/TestColumnSeeking.java | 5 +- .../hbase/regionserver/TestDefaultMemStore.java | 9 +- .../regionserver/TestGetClosestAtOrBefore.java | 5 +- .../hadoop/hbase/regionserver/TestHRegion.java | 104 ++-- .../hbase/regionserver/TestKeepDeletes.java | 6 +- .../hbase/regionserver/TestMajorCompaction.java | 9 +- .../regionserver/TestMultiColumnScanner.java | 3 +- .../TestRegionMergeTransaction.java | 3 +- .../regionserver/TestReversibleScanners.java | 3 +- .../regionserver/TestScanWithBloomError.java | 3 +- .../hadoop/hbase/regionserver/TestScanner.java | 11 +- .../regionserver/TestSeekOptimizations.java | 3 +- .../regionserver/TestSplitTransaction.java | 3 +- .../hbase/regionserver/TestStoreScanner.java | 53 +- .../hbase/regionserver/TestStripeCompactor.java | 16 +- .../hbase/regionserver/TestWideScanner.java | 3 +- .../compactions/TestStripeCompactionPolicy.java | 18 +- .../hbase/regionserver/wal/TestWALReplay.java | 3 +- .../apache/hadoop/hbase/util/TestMergeTool.java | 3 +- 56 files changed, 1177 insertions(+), 794 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 4800d56..0c28e05 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -402,6 +402,9 @@ public class ClientScanner extends AbstractClientScanner { // happens for the cases where we see exceptions. Since only openScanner // would have happened, values would be null if (values == null && callable.switchedToADifferentReplica()) { + // Any accumulated partial results are no longer valid since the callable will + // openScanner with the correct startkey and we must pick up from there + clearPartialResults(); this.currentRegion = callable.getHRegionInfo(); continue; } http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index a6c5c11..e32a2d2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -292,14 +292,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { continue; //this was already scheduled earlier } ScannerCallable s = currentScannerCallable.getScannerCallableForReplica(id); - - if (this.lastResult != null) { - if(s.getScan().isReversed()){ - s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); - }else { - s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); - } - } + setStartRowForReplicaCallable(s); outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); cs.submit(retryingOnReplica, scannerTimeout, id); @@ -307,6 +300,31 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { return max - min + 1; } + /** + * Set the start row for the replica callable based on the state of the last result received. + * @param callable The callable to set the start row on + */ + private void setStartRowForReplicaCallable(ScannerCallable callable) { + if (this.lastResult == null || callable == null) return; + + if (this.lastResult.isPartial()) { + // The last result was a partial result which means we have not received all of the cells + // for this row. Thus, use the last result's row as the start row. If a replica switch + // occurs, the scanner will ensure that any accumulated partial results are cleared, + // and the scan can resume from this row. + callable.getScan().setStartRow(this.lastResult.getRow()); + } else { + // The last result was not a partial result which means it contained all of the cells for + // that row (we no longer need any information from it). Set the start row to the next + // closest row that could be seen. + if (callable.getScan().isReversed()) { + callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); + } else { + callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); + } + } + } + @VisibleForTesting boolean isAnyRPCcancelled() { return someRPCcancelled; http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java index e0c3bae..93f98ac 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/BulkDeleteEndpoint.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.BulkDeleteProtos.Bu import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -137,7 +136,7 @@ public class BulkDeleteEndpoint extends BulkDeleteService implements Coprocessor List<List<Cell>> deleteRows = new ArrayList<List<Cell>>(rowBatchSize); for (int i = 0; i < rowBatchSize; i++) { List<Cell> results = new ArrayList<Cell>(); - hasMore = NextState.hasMoreValues(scanner.next(results)); + hasMore = scanner.next(results); if (results.size() > 0) { deleteRows.add(results); } http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java ---------------------------------------------------------------------- diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java index 2afd05e..4309cdc 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/RowCountEndpoint.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.util.Bytes; import com.google.protobuf.RpcCallback; @@ -81,7 +80,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService byte[] lastRow = null; long count = 0; do { - hasMore = NextState.hasMoreValues(scanner.next(results)); + hasMore = scanner.next(results); for (Cell kv : results) { byte[] currentRow = CellUtil.cloneRow(kv); if (lastRow == null || !Bytes.equals(lastRow, currentRow)) { @@ -120,7 +119,7 @@ public class RowCountEndpoint extends ExampleProtos.RowCountService boolean hasMore = false; long count = 0; do { - hasMore = NextState.hasMoreValues(scanner.next(results)); + hasMore = scanner.next(results); for (Cell kv : results) { count++; } http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java index a80a07e..5809983 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -30,8 +29,10 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.mortbay.log.Log; @@ -72,10 +73,7 @@ public class ClientSideRegionScanner extends AbstractClientScanner { public Result next() throws IOException { values.clear(); - // negative values indicate no limits - final long remainingResultSize = -1; - final int batchLimit = -1; - scanner.nextRaw(values, batchLimit, remainingResultSize); + scanner.nextRaw(values, NoLimitScannerContext.getInstance()); if (values.isEmpty()) { //we are done return null; http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java index b6f834e..81c933b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java @@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateReque import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse; import org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateService; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import com.google.protobuf.ByteString; import com.google.protobuf.Message; @@ -92,7 +91,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { // qualifier can be null. boolean hasMoreRows = false; do { - hasMoreRows = NextState.hasMoreValues(scanner.next(results)); + hasMoreRows = scanner.next(results); int listSize = results.size(); for (int i = 0; i < listSize; i++) { temp = ci.getValue(colFamily, qualifier, results.get(i)); @@ -146,7 +145,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { } boolean hasMoreRows = false; do { - hasMoreRows = NextState.hasMoreValues(scanner.next(results)); + hasMoreRows = scanner.next(results); int listSize = results.size(); for (int i = 0; i < listSize; i++) { temp = ci.getValue(colFamily, qualifier, results.get(i)); @@ -200,7 +199,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { List<Cell> results = new ArrayList<Cell>(); boolean hasMoreRows = false; do { - hasMoreRows = NextState.hasMoreValues(scanner.next(results)); + hasMoreRows = scanner.next(results); int listSize = results.size(); for (int i = 0; i < listSize; i++) { temp = ci.getValue(colFamily, qualifier, results.get(i)); @@ -254,7 +253,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { scanner = env.getRegion().getScanner(scan); boolean hasMoreRows = false; do { - hasMoreRows = NextState.hasMoreValues(scanner.next(results)); + hasMoreRows = scanner.next(results); if (results.size() > 0) { counter++; } @@ -313,7 +312,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { do { results.clear(); - hasMoreRows = NextState.hasMoreValues(scanner.next(results)); + hasMoreRows = scanner.next(results); int listSize = results.size(); for (int i = 0; i < listSize; i++) { sumVal = ci.add(sumVal, ci.castToReturnType(ci.getValue(colFamily, @@ -374,7 +373,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { do { tempVal = null; - hasMoreRows = NextState.hasMoreValues(scanner.next(results)); + hasMoreRows = scanner.next(results); int listSize = results.size(); for (int i = 0; i < listSize; i++) { tempVal = ci.add(tempVal, ci.castToReturnType(ci.getValue(colFamily, @@ -441,7 +440,7 @@ extends AggregateService implements CoprocessorService, Coprocessor { do { tempVal = null; tempWeight = null; - hasMoreRows = NextState.hasMoreValues(scanner.next(results)); + hasMoreRows = scanner.next(results); int listSize = results.size(); for (int i = 0; i < listSize; i++) { Cell kv = results.get(i); http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 259fb9a..8103445 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -141,8 +141,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor.Stor import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor.EventType; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor; -import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; +import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; +import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController; import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory; @@ -5175,7 +5176,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected Cell joinedContinuationRow = null; protected final byte[] stopRow; private final FilterWrapper filter; - private int batch; + private ScannerContext defaultScannerContext; protected int isScan; private boolean filterClosed = false; private long readPt; @@ -5198,7 +5199,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi this.filter = null; } - this.batch = scan.getBatch(); + /** + * By default, calls to next/nextRaw must enforce the batch limit. Thus, construct a default + * scanner context that can be used to enforce the batch limit in the event that a + * ScannerContext is not specified during an invocation of next/nextRaw + */ + defaultScannerContext = ScannerContext.newBuilder().setBatchLimit(scan.getBatch()).build(); + if (Bytes.equals(scan.getStopRow(), HConstants.EMPTY_END_ROW) && !scan.isGetScan()) { this.stopRow = null; } else { @@ -5259,7 +5266,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @Override public int getBatch() { - return this.batch; + return this.defaultScannerContext.getBatchLimit(); } /** @@ -5274,19 +5281,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } @Override - public NextState next(List<Cell> outResults) + public boolean next(List<Cell> outResults) throws IOException { // apply the batching limit by default - return next(outResults, batch); - } - - @Override - public NextState next(List<Cell> outResults, int limit) throws IOException { - return next(outResults, limit, -1); + return next(outResults, defaultScannerContext); } @Override - public synchronized NextState next(List<Cell> outResults, int limit, long remainingResultSize) + public synchronized boolean next(List<Cell> outResults, ScannerContext scannerContext) throws IOException { if (this.filterClosed) { throw new UnknownScannerException("Scanner was closed (timed out?) " + @@ -5296,122 +5298,107 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi startRegionOperation(Operation.SCAN); readRequestsCount.increment(); try { - return nextRaw(outResults, limit, remainingResultSize); + return nextRaw(outResults, scannerContext); } finally { closeRegionOperation(Operation.SCAN); } } @Override - public NextState nextRaw(List<Cell> outResults) throws IOException { - return nextRaw(outResults, batch); + public boolean nextRaw(List<Cell> outResults) throws IOException { + // Use the RegionScanner's context by default + return nextRaw(outResults, defaultScannerContext); } @Override - public NextState nextRaw(List<Cell> outResults, int limit) - throws IOException { - return nextRaw(outResults, limit, -1); - } - - @Override - public NextState nextRaw(List<Cell> outResults, int batchLimit, long remainingResultSize) + public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext) throws IOException { if (storeHeap == null) { // scanner is closed throw new UnknownScannerException("Scanner was closed"); } - NextState state; + boolean moreValues; if (outResults.isEmpty()) { // Usually outResults is empty. This is true when next is called // to handle scan or get operation. - state = nextInternal(outResults, batchLimit, remainingResultSize); + moreValues = nextInternal(outResults, scannerContext); } else { List<Cell> tmpList = new ArrayList<Cell>(); - state = nextInternal(tmpList, batchLimit, remainingResultSize); + moreValues = nextInternal(tmpList, scannerContext); outResults.addAll(tmpList); } - // Invalid states should never be returned. Receiving an invalid state means that we have - // no clue how to proceed. Throw an exception. - if (!NextState.isValidState(state)) { - throw new IOException("Invalid state returned from nextInternal. state:" + state); - } // If the size limit was reached it means a partial Result is being returned. Returning a // partial Result means that we should not reset the filters; filters should only be reset in // between rows - if (!state.sizeLimitReached()) resetFilters(); + if (!scannerContext.partialResultFormed()) resetFilters(); if (isFilterDoneInternal()) { - state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); + moreValues = false; } - return state; + return moreValues; } /** - * @return the state the joinedHeap returned on the call to - * {@link KeyValueHeap#next(List, int, long)} + * @return true if more cells exist after this batch, false if scanner is done */ - private NextState populateFromJoinedHeap(List<Cell> results, int limit, long resultSize) + private boolean populateFromJoinedHeap(List<Cell> results, ScannerContext scannerContext) throws IOException { assert joinedContinuationRow != null; - NextState state = - populateResult(results, this.joinedHeap, limit, resultSize, + boolean moreValues = + populateResult(results, this.joinedHeap, scannerContext, joinedContinuationRow.getRowArray(), joinedContinuationRow.getRowOffset(), joinedContinuationRow.getRowLength()); - if (state != null && !state.batchLimitReached() && !state.sizeLimitReached()) { + + if (!scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { // We are done with this row, reset the continuation. joinedContinuationRow = null; } // As the data is obtained from two independent heaps, we need to // ensure that result list is sorted, because Result relies on that. Collections.sort(results, comparator); - return state; + return moreValues; } /** * Fetches records with currentRow into results list, until next row, batchLimit (if not -1) is * reached, or remainingResultSize (if not -1) is reaced * @param heap KeyValueHeap to fetch data from.It must be positioned on correct row before call. - * @param remainingResultSize The remaining space within our result size limit. A negative value - * indicate no limit - * @param batchLimit Max amount of KVs to place in result list, -1 means no limit. + * @param scannerContext * @param currentRow Byte array with key we are fetching. * @param offset offset for currentRow * @param length length for currentRow * @return state of last call to {@link KeyValueHeap#next()} */ - private NextState populateResult(List<Cell> results, KeyValueHeap heap, int batchLimit, - long remainingResultSize, byte[] currentRow, int offset, short length) throws IOException { + private boolean populateResult(List<Cell> results, KeyValueHeap heap, + ScannerContext scannerContext, byte[] currentRow, int offset, short length) + throws IOException { Cell nextKv; boolean moreCellsInRow = false; - long accumulatedResultSize = 0; - List<Cell> tmpResults = new ArrayList<Cell>(); + boolean tmpKeepProgress = scannerContext.getKeepProgress(); + // Scanning between column families and thus the scope is between cells + LimitScope limitScope = LimitScope.BETWEEN_CELLS; do { - int remainingBatchLimit = batchLimit - results.size(); - NextState heapState = - heap.next(tmpResults, remainingBatchLimit, remainingResultSize - accumulatedResultSize); - results.addAll(tmpResults); - accumulatedResultSize += calculateResultSize(tmpResults, heapState); - tmpResults.clear(); - - if (batchLimit > 0 && results.size() == batchLimit) { - return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, accumulatedResultSize); - } + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); nextKv = heap.peek(); moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - boolean sizeLimitReached = - remainingResultSize > 0 && accumulatedResultSize >= remainingResultSize; - if (moreCellsInRow && sizeLimitReached) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, accumulatedResultSize); + + if (scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); } } while (moreCellsInRow); - if (nextKv != null) { - return NextState.makeState(NextState.State.MORE_VALUES, accumulatedResultSize); - } else { - return NextState.makeState(NextState.State.NO_MORE_VALUES, accumulatedResultSize); - } + return nextKv != null; } /** @@ -5429,30 +5416,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return nextKv != null && CellUtil.matchingRow(nextKv, currentRow, offset, length); } - /** - * Calculates the size of the results. If the state of the scanner that these results came from - * indicates that an estimate of the result size has already been generated, we can skip the - * calculation and use that instead. - * @param results List of cells we want to calculate size of - * @param state The state returned from the scanner that generated these results - * @return aggregate size of results - */ - private long calculateResultSize(List<Cell> results, NextState state) { - if (results == null || results.isEmpty()) return 0; - - // In general, the state should contain the estimate because the result size used to - // determine when the scan has exceeded its size limit. If the estimate is contained in the - // state then we can avoid an unnecesasry calculation. - if (state != null && state.hasResultSizeEstimate()) return state.getResultSize(); - - long size = 0; - for (Cell c : results) { - size += CellUtil.estimatedHeapSizeOfWithoutTags(c); - } - - return size; - } - /* * @return True if a filter rules the scanner is over, done. */ @@ -5465,20 +5428,37 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return this.filter != null && this.filter.filterAllRemaining(); } - private NextState nextInternal(List<Cell> results, int batchLimit, long remainingResultSize) + private boolean nextInternal(List<Cell> results, ScannerContext scannerContext) throws IOException { if (!results.isEmpty()) { throw new IllegalArgumentException("First parameter should be an empty list"); } - // Estimate of the size (heap size) of the results returned from this method - long resultSize = 0; + if (scannerContext == null) { + throw new IllegalArgumentException("Scanner context cannot be null"); + } RpcCallContext rpcCall = RpcServer.getCurrentCall(); + + // Save the initial progress from the Scanner context in these local variables. The progress + // may need to be reset a few times if rows are being filtered out so we save the initial + // progress. + int initialBatchProgress = scannerContext.getBatchProgress(); + long initialSizeProgress = scannerContext.getSizeProgress(); + // The loop here is used only when at some point during the next we determine // that due to effects of filters or otherwise, we have an empty row in the result. // Then we loop and try again. Otherwise, we must get out on the first iteration via return, // "true" if there's more data to read, "false" if there isn't (storeHeap is at a stop row, // and joinedHeap has no more data to read for the last row (if set, joinedContinuationRow). while (true) { + // Starting to scan a new row. Reset the scanner progress according to whether or not + // progress should be kept. + if (scannerContext.getKeepProgress()) { + // Progress should be kept. Reset to initial values seen at start of method invocation. + scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + } else { + scannerContext.clearProgress(); + } + if (rpcCall != null) { // If a user specifies a too-restrictive or too-slow scanner, the // client might time out and disconnect while the server side @@ -5506,21 +5486,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } boolean stopRow = isStopRow(currentRow, offset, length); + // When has filter row is true it means that the all the cells for a particular row must be + // read before a filtering decision can be made. This means that filters where hasFilterRow + // run the risk of encountering out of memory errors in the case that they are applied to a + // table that has very large rows. boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow(); // If filter#hasFilterRow is true, partial results are not allowed since allowing them // would prevent the filters from being evaluated. Thus, if it is true, change the - // remainingResultSize to -1 so that the entire row's worth of cells are fetched. - if (hasFilterRow && remainingResultSize > 0) { - remainingResultSize = -1; + // scope of any limits that could potentially create partial results to + // LimitScope.BETWEEN_ROWS so that those limits are not reached mid-row + if (hasFilterRow) { if (LOG.isTraceEnabled()) { - LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + - " formed. The remainingResultSize of: " + remainingResultSize + " will not " + - " be considered when fetching the cells for this row."); + LOG.trace("filter#hasFilterRow is true which prevents partial results from being " + + " formed. Changing scope of limits that may create partials"); } + scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS); } - NextState joinedHeapState; // Check if we were getting data from the joinedHeap and hit the limit. // If not, then it's main path - getting results from storeHeap. if (joinedContinuationRow == null) { @@ -5529,47 +5512,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (hasFilterRow) { filter.filterRowCells(results); } - return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(currentRow, offset, length)) { boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + if (!moreRows) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } results.clear(); continue; } - NextState storeHeapState = - populateResult(results, this.storeHeap, batchLimit, remainingResultSize, currentRow, - offset, length); - resultSize += calculateResultSize(results, storeHeapState); - // Invalid states should never be returned. If one is seen, throw exception - // since we have no way of telling how we should proceed - if (!NextState.isValidState(storeHeapState)) { - throw new IOException("NextState returned from call storeHeap was invalid"); - } - // Ok, we are good, let's try to get some results from the main heap. - if (storeHeapState.batchLimitReached()) { - if (hasFilterRow) { - throw new IncompatibleFilterException( - "Filter whose hasFilterRow() returns true is incompatible with scan with limit!"); - } - // We hit the batch limit. - return NextState.makeState(NextState.State.BATCH_LIMIT_REACHED, resultSize); - } else if (storeHeapState.sizeLimitReached()) { + populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length); + + if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { if (hasFilterRow) { - // We try to guard against this case above when remainingResultSize is set to -1 if - // hasFilterRow is true. In the even that the guard doesn't work, an exception must be - // thrown throw new IncompatibleFilterException( - "Filter whose hasFilterRows() returns true is incompatible with scans that" - + " return partial results"); + "Filter whose hasFilterRow() returns true is incompatible with scans that must " + + " stop mid-row because of a limit. ScannerContext:" + scannerContext); } - // We hit the size limit. - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + return true; } Cell nextKv = this.storeHeap.peek(); stopRow = nextKv == null || @@ -5582,17 +5548,31 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED; if (hasFilterRow) { ret = filter.filterRowCellsWithRet(results); + + // We don't know how the results have changed after being filtered. Must set progress + // according to contents of results now. + if (scannerContext.getKeepProgress()) { + scannerContext.setProgress(initialBatchProgress, initialSizeProgress); + } else { + scannerContext.clearProgress(); + } + scannerContext.incrementBatchProgress(results.size()); + for (Cell cell : results) { + scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell)); + } } if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) { results.clear(); boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + if (!moreRows) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } // This row was totally filtered out, if this is NOT the last row, // we should continue on. Otherwise, nothing else to do. if (!stopRow) continue; - return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } // Ok, we are done with storeHeap for this row. @@ -5610,31 +5590,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi currentRow, offset, length)); if (mayHaveData) { joinedContinuationRow = current; - joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); - resultSize += - joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? - joinedHeapState.getResultSize() : 0; - if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + populateFromJoinedHeap(results, scannerContext); + + if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { + return true; } } } } else { // Populating from the joined heap was stopped by limits, populate some more. - joinedHeapState = - populateFromJoinedHeap(results, batchLimit, remainingResultSize - resultSize); - resultSize += - joinedHeapState != null && joinedHeapState.hasResultSizeEstimate() ? - joinedHeapState.getResultSize() : 0; - if (joinedHeapState != null && joinedHeapState.sizeLimitReached()) { - return NextState.makeState(NextState.State.SIZE_LIMIT_REACHED, resultSize); + populateFromJoinedHeap(results, scannerContext); + if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) { + return true; } } // We may have just called populateFromJoinedMap and hit the limits. If that is // the case, we need to call it again on the next next() invocation. if (joinedContinuationRow != null) { - return NextState.makeState(NextState.State.MORE_VALUES, resultSize); + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } // Finally, we are done with both joinedHeap and storeHeap. @@ -5642,15 +5615,17 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { boolean moreRows = nextRow(currentRow, offset, length); - if (!moreRows) return NextState.makeState(NextState.State.NO_MORE_VALUES, 0); + if (!moreRows) { + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); + } if (!stopRow) continue; } // We are done. Return the result. if (stopRow) { - return NextState.makeState(NextState.State.NO_MORE_VALUES, resultSize); + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } else { - return NextState.makeState(NextState.State.MORE_VALUES, resultSize); + return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } } } @@ -7372,7 +7347,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean done; do { kvs.clear(); - done = NextState.hasMoreValues(scanner.next(kvs)); + done = scanner.next(kvs); if (kvs.size() > 0) LOG.info(kvs); } while (done); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java index ea5a75f..f73e363 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/InternalScanner.java @@ -42,218 +42,21 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; @InterfaceAudience.Private public interface InternalScanner extends Closeable { /** - * This class encapsulates all the meaningful state information that we would like the know about - * after a call to {@link InternalScanner#next(List)}. While this is not an enum, a restriction on - * the possible states is implied through the exposed {@link #makeState(State)} method. - */ - public static class NextState { - /** - * The possible states we want to restrict ourselves to. This enum is not sufficient to - * encapsulate all of the state information since some of the fields of the state must be - * dynamic (e.g. resultSize). - */ - public enum State { - MORE_VALUES(true), - NO_MORE_VALUES(false), - SIZE_LIMIT_REACHED(true), - BATCH_LIMIT_REACHED(true); - - private boolean moreValues; - - private State(final boolean moreValues) { - this.moreValues = moreValues; - } - - /** - * @return true when the state indicates that more values may follow those that have been - * returned - */ - public boolean hasMoreValues() { - return this.moreValues; - } - } - - /** - * state variables - */ - private final State state; - private long resultSize; - - /** - * Value to use for resultSize when the size has not been calculated. Must be a negative number - * so that {@link NextState#hasResultSizeEstimate()} returns false. - */ - private static final long DEFAULT_RESULT_SIZE = -1; - - private NextState(State state, long resultSize) { - this.state = state; - this.resultSize = resultSize; - } - - /** - * @param state - * @return An instance of {@link NextState} where the size of the results returned from the call - * to {@link InternalScanner#next(List)} is unknown. It it the responsibility of the - * caller of {@link InternalScanner#next(List)} to calculate the result size if needed - */ - public static NextState makeState(final State state) { - return makeState(state, DEFAULT_RESULT_SIZE); - } - - /** - * @param state - * @param resultSize - * @return An instance of {@link NextState} where the size of the values returned from the call - * to {@link InternalScanner#next(List)} is known. The caller can avoid recalculating - * the result size by using the cached value retrievable via {@link #getResultSize()} - */ - public static NextState makeState(final State state, long resultSize) { - switch (state) { - case MORE_VALUES: - return createMoreValuesState(resultSize); - case NO_MORE_VALUES: - return createNoMoreValuesState(resultSize); - case BATCH_LIMIT_REACHED: - return createBatchLimitReachedState(resultSize); - case SIZE_LIMIT_REACHED: - return createSizeLimitReachedState(resultSize); - default: - // If the state is not recognized, default to no more value state - return createNoMoreValuesState(resultSize); - } - } - - /** - * Convenience method for creating a state that indicates that more values can be scanned - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createMoreValuesState(long resultSize) { - return new NextState(State.MORE_VALUES, resultSize); - } - - /** - * Convenience method for creating a state that indicates that no more values can be scanned. - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createNoMoreValuesState(long resultSize) { - return new NextState(State.NO_MORE_VALUES, resultSize); - } - - /** - * Convenience method for creating a state that indicates that the scan stopped because the - * batch limit was exceeded - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createBatchLimitReachedState(long resultSize) { - return new NextState(State.BATCH_LIMIT_REACHED, resultSize); - } - - /** - * Convenience method for creating a state that indicates that the scan stopped due to the size - * limit - * @param resultSize estimate of the size (heap size) of the values returned from the call to - * {@link InternalScanner#next(List)} - */ - private static NextState createSizeLimitReachedState(long resultSize) { - return new NextState(State.SIZE_LIMIT_REACHED, resultSize); - } - - /** - * @return true when the scanner has more values to be scanned following the values returned by - * the call to {@link InternalScanner#next(List)} - */ - public boolean hasMoreValues() { - return this.state.hasMoreValues(); - } - - /** - * @return true when the scanner had to stop scanning because it reached the batch limit - */ - public boolean batchLimitReached() { - return this.state == State.BATCH_LIMIT_REACHED; - } - - /** - * @return true when the scanner had to stop scanning because it reached the size limit - */ - public boolean sizeLimitReached() { - return this.state == State.SIZE_LIMIT_REACHED; - } - - /** - * @return The size (heap size) of the values that were returned from the call to - * {@link InternalScanner#next(List)}. This value should only be used if - * {@link #hasResultSizeEstimate()} returns true. - */ - public long getResultSize() { - return resultSize; - } - - /** - * @return true when an estimate for the size of the values returned by - * {@link InternalScanner#next(List)} was provided. If false, it is the responsibility - * of the caller to calculate the result size - */ - public boolean hasResultSizeEstimate() { - return resultSize >= 0; - } - - @Override - public String toString() { - return "State: " + state + " resultSize: " + resultSize; - } - - /** - * Helper method to centralize all checks as to whether or not the state is valid. - * @param state - * @return true when the state is valid - */ - public static boolean isValidState(NextState state) { - return state != null; - } - - /** - * @param state - * @return true when the state is non null and indicates that more values exist - */ - public static boolean hasMoreValues(NextState state) { - return state != null && state.hasMoreValues(); - } - } - - /** * Grab the next row's worth of values. * @param results return output array - * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this - * one, false if scanner is done + * @return true if more rows exist after this one, false if scanner is done * @throws IOException e */ - NextState next(List<Cell> results) throws IOException; + boolean next(List<Cell> results) throws IOException; /** - * Grab the next row's worth of values with a limit on the number of values to return. - * @param result return output array - * @param limit limit on row count to get - * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this - * one, false if scanner is done - * @throws IOException e - */ - NextState next(List<Cell> result, int limit) throws IOException; - - /** - * Grab the next row's worth of values with a limit on the number of values to return as well as a - * restriction on the size of the list of values that are returned. + * Grab the next row's worth of values. * @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize limit on the size of the result being returned - * @return state where {@link NextState#hasMoreValues()} is true if more rows exist after this - * one, false if scanner is done + * @param scannerContext + * @return true if more rows exist after this one, false if scanner is done * @throws IOException e */ - NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException; + boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException; /** * Closes the scanner and releases any resources it has allocated http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index beb23cf..761267f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -27,6 +27,7 @@ import java.util.PriorityQueue; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue.KVComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState; /** * Implements a heap merge across any number of KeyValueScanners. @@ -128,26 +129,20 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * This can ONLY be called when you are using Scanners that implement InternalScanner as well as * KeyValueScanner (a {@link StoreScanner}). * @param result - * @param limit - * @return state where NextState#hasMoreValues() is true if more keys exist after this - * one, false if scanner is done + * @return true if more rows exist after this one, false if scanner is done */ - public NextState next(List<Cell> result, int limit) throws IOException { - return next(result, limit, -1); + @Override + public boolean next(List<Cell> result) throws IOException { + return next(result, NoLimitScannerContext.getInstance()); } - public NextState next(List<Cell> result, int limit, long remainingResultSize) throws IOException { + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { if (this.current == null) { - return NextState.makeState(NextState.State.NO_MORE_VALUES); + return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } InternalScanner currentAsInternal = (InternalScanner)this.current; - NextState state = currentAsInternal.next(result, limit, remainingResultSize); - // Invalid states should never be returned. Receiving an invalid state means that we have - // no clue how to proceed. Throw an exception. - if (!NextState.isValidState(state)) { - throw new IOException("Invalid state returned from InternalScanner#next"); - } - boolean mayContainMoreRows = NextState.hasMoreValues(state); + boolean moreCells = currentAsInternal.next(result, scannerContext); Cell pee = this.current.peek(); /* * By definition, any InternalScanner must return false only when it has no @@ -156,31 +151,16 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner * more efficient to close scanners which are not needed than keep them in * the heap. This is also required for certain optimizations. */ - if (pee == null || !mayContainMoreRows) { + if (pee == null || !moreCells) { this.current.close(); } else { this.heap.add(this.current); } this.current = pollRealKV(); if (this.current == null) { - state = NextState.makeState(NextState.State.NO_MORE_VALUES, state.getResultSize()); + moreCells = scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); } - return state; - } - - /** - * Gets the next row of keys from the top-most scanner. - * <p> - * This method takes care of updating the heap. - * <p> - * This can ONLY be called when you are using Scanners that implement InternalScanner as well as - * KeyValueScanner (a {@link StoreScanner}). - * @param result - * @return state where NextState#hasMoreValues() is true if more keys exist after this - * one, false if scanner is done - */ - public NextState next(List<Cell> result) throws IOException { - return next(result, -1); + return moreCells; } protected static class KVScannerComparator implements Comparator<KeyValueScanner> { http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java new file mode 100644 index 0000000..1484e80 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This is a special {@link ScannerContext} subclass that is designed to be used globally when + * limits should not be enforced during invocations of {@link InternalScanner#next(java.util.List)} + * or {@link RegionScanner#next(java.util.List)}. + * <p> + * Instances of {@link NoLimitScannerContext} are immutable after construction. Any attempt to + * change the limits or progress of a {@link NoLimitScannerContext} will fail silently. The net + * effect is that all limit checks will return false, thus indicating that a limit has not been + * reached. + */ [email protected](HBaseInterfaceAudience.COPROC) [email protected] +public class NoLimitScannerContext extends ScannerContext { + + public NoLimitScannerContext() { + super(false, null); + } + + /** + * Use this instance whenever limits do not need to be enforced. + */ + private static final ScannerContext NO_LIMIT = new NoLimitScannerContext(); + + /** + * @return The static, immutable instance of {@link NoLimitScannerContext} to be used whenever + * limits should not be enforced + */ + public static final ScannerContext getInstance() { + return NO_LIMIT; + } + + @Override + void setKeepProgress(boolean keepProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void setBatchProgress(int batchProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void setSizeProgress(long sizeProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void setProgress(int batchProgress, long sizeProgress) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + void setSizeLimitScope(LimitScope scope) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + } + + @Override + NextState setScannerState(NextState state) { + // Do nothing. NoLimitScannerContext instances are immutable post-construction + return state; + } + + @Override + boolean checkBatchLimit(LimitScope checkerScope) { + // No limits can be specified, thus return false to indicate no limit has been reached. + return false; + } + + @Override + boolean checkSizeLimit(LimitScope checkerScope) { + // No limits can be specified, thus return false to indicate no limit has been reached. + return false; + } + + @Override + boolean checkAnyLimitReached(LimitScope checkerScope) { + return false; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 6dbf684..48b2b3c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -108,8 +108,6 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; -import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.RollWALWriterRequest; @@ -123,6 +121,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateConfiguratio import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionRequest; +import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WarmupRegionResponse; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequest.FamilyPath; @@ -154,10 +154,10 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor; import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; -import org.apache.hadoop.hbase.regionserver.InternalScanner.NextState; import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException; import org.apache.hadoop.hbase.regionserver.Region.FlushResult; import org.apache.hadoop.hbase.regionserver.Region.Operation; +import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler; import org.apache.hadoop.hbase.wal.WAL; @@ -2259,61 +2259,53 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // correct ordering of partial results and so we prevent partial results from being // formed. boolean serverGuaranteesOrderOfPartials = currentScanResultSize == 0; - boolean enforceMaxResultSizeAtCellLevel = + boolean allowPartialResults = clientHandlesPartials && serverGuaranteesOrderOfPartials && !isSmallScan; - NextState state = null; + boolean moreRows = false; + + final LimitScope sizeScope = + allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS; + + // Configure with limits for this RPC. Set keep progress true since size progress + // towards size limit should be kept between calls to nextRaw + ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true); + contextBuilder.setSizeLimit(sizeScope, maxResultSize); + contextBuilder.setBatchLimit(scanner.getBatch()); + ScannerContext scannerContext = contextBuilder.build(); while (i < rows) { // Stop collecting results if we have exceeded maxResultSize - if (currentScanResultSize >= maxResultSize) { + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS)) { builder.setMoreResultsInRegion(true); break; } - // A negative remainingResultSize communicates that there is no limit on the size - // of the results. - final long remainingResultSize = - enforceMaxResultSizeAtCellLevel ? maxResultSize - currentScanResultSize - : -1; + // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The + // batch limit is a limit on the number of cells per Result. Thus, if progress is + // being tracked (i.e. scannerContext.keepProgress() is true) then we need to + // reset the batch progress between nextRaw invocations since we don't want the + // batch progress from previous calls to affect future calls + scannerContext.setBatchProgress(0); // Collect values to be returned here - state = scanner.nextRaw(values, scanner.getBatch(), remainingResultSize); - // Invalid states should never be returned. If one is seen, throw exception - // to stop the scan -- We have no way of telling how we should proceed - if (!NextState.isValidState(state)) { - throw new IOException("NextState returned from call to nextRaw was invalid"); - } - if (!values.isEmpty()) { - // The state should always contain an estimate of the result size because that - // estimate must be used to decide when partial results are formed. - boolean skipResultSizeCalculation = state.hasResultSizeEstimate(); - if (skipResultSizeCalculation) currentScanResultSize += state.getResultSize(); + moreRows = scanner.nextRaw(values, scannerContext); + if (!values.isEmpty()) { for (Cell cell : values) { totalCellSize += CellUtil.estimatedSerializedSizeOf(cell); - - // If the calculation can't be skipped, then do it now. - if (!skipResultSizeCalculation) { - currentScanResultSize += CellUtil.estimatedHeapSizeOfWithoutTags(cell); - } } - // The size limit was reached. This means there are more cells remaining in - // the row but we had to stop because we exceeded our max result size. This - // indicates that we are returning a partial result - final boolean partial = state != null && state.sizeLimitReached(); + final boolean partial = scannerContext.partialResultFormed(); results.add(Result.create(values, null, stale, partial)); i++; } - if (!NextState.hasMoreValues(state)) { + if (!moreRows) { break; } values.clear(); } - // currentScanResultSize >= maxResultSize should be functionally equivalent to - // state.sizeLimitReached() - if (null != state - && (currentScanResultSize >= maxResultSize || i >= rows || state - .hasMoreValues())) { + + if (scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS) || i >= rows || + moreRows) { // We stopped prematurely builder.setMoreResultsInRegion(true); } else { http://git-wip-us.apache.org/repos/asf/hbase/blob/408b9161/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index 0002851..66e087b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; * RegionScanner describes iterators over rows in an HRegion. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) [email protected] [email protected] public interface RegionScanner extends InternalScanner { /** * @return The RegionInfo for this scanner. @@ -74,41 +74,22 @@ public interface RegionScanner extends InternalScanner { int getBatch(); /** - * Grab the next row's worth of values with the default limit on the number of values to return. - * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. -<<<<<<< HEAD - * Caller must set the thread's readpoint, start and close a region operation, an - * synchronize on the scanner object. Caller should maintain and update metrics. - * See {@link #nextRaw(List, int)} -======= - * Caller must set the thread's readpoint, start and close a region operation, an synchronize on - * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} ->>>>>>> 3b6145e... HBASE-11544: [Ergonomics] hbase.client.scanner.caching is dogged and will try to return batch even if it means OOME - * @param result return output array - * @return a state where NextState#hasMoreValues() is true when more rows exist, false when - * scanner is done. - * @throws IOException e - */ - NextState nextRaw(List<Cell> result) throws IOException; - - /** - * Grab the next row's worth of values with the default limit on the number of values to return. - * This is a special internal method to be called from coprocessor hooks to avoid expensive setup. - * Caller must set the thread's readpoint, start and close a region operation, an synchronize on - * the scanner object. Caller should maintain and update metrics. See - * {@link #nextRaw(List, int, long)} + * Grab the next row's worth of values. This is a special internal method to be called from + * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and + * close a region operation, an synchronize on the scanner object. Caller should maintain and + * update metrics. See {@link #nextRaw(List, ScannerContext)} * @param result return output array - * @param limit limit on row count to get - * @return a state where NextState#hasMoreValues() is true when more rows exist, false when - * scanner is done. + * @return true if more rows exist after this one, false if scanner is done * @throws IOException e */ - NextState nextRaw(List<Cell> result, int limit) throws IOException; - + boolean nextRaw(List<Cell> result) throws IOException; + /** - * Grab the next row's worth of values with a limit on the number of values to return as well as a - * limit on the heap size of those values. This is a special internal method to be called from + * Grab the next row's worth of values. The {@link ScannerContext} is used to enforce and track + * any limits associated with this call. Any progress that exists in the {@link ScannerContext} + * prior to calling this method will be LOST if {@link ScannerContext#getKeepProgress()} is false. + * Upon returning from this method, the {@link ScannerContext} will contain information about the + * progress made towards the limits. This is a special internal method to be called from * coprocessor hooks to avoid expensive setup. Caller must set the thread's readpoint, start and * close a region operation, an synchronize on the scanner object. Example: <code><pre> * HRegion region = ...; @@ -126,13 +107,12 @@ public interface RegionScanner extends InternalScanner { * } * </pre></code> * @param result return output array - * @param limit limit on row count to get - * @param remainingResultSize the space remaining within the restriction on the result size. - * Negative values indicate no limit - * @return a state where NextState#hasMoreValues() is true when more rows exist, false when - * scanner is done. + * @param scannerContext The {@link ScannerContext} instance encapsulating all limits that should + * be tracked during calls to this method. The progress towards these limits can be + * tracked within this instance. + * @return true if more rows exist after this one, false if scanner is done * @throws IOException e */ - NextState nextRaw(List<Cell> result, int limit, final long remainingResultSize) + boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException; }
