HBASE-17508 Unify the implementation of small scan and regular scan for sync client
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4456d228 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4456d228 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4456d228 Branch: refs/heads/branch-1 Commit: 4456d22859d7e1821f0b8b9cf3acdb8564b3cd09 Parents: fb12397 Author: zhangduo <[email protected]> Authored: Sat Feb 4 21:14:31 2017 +0800 Committer: zhangduo <[email protected]> Committed: Sun Feb 5 08:49:51 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/ClientScanner.java | 305 +++++---- .../client/ClientSmallReversedScanner.java | 345 ---------- .../hadoop/hbase/client/ClientSmallScanner.java | 317 --------- .../hadoop/hbase/client/ConnectionManager.java | 15 +- .../hadoop/hbase/client/ConnectionUtils.java | 39 +- .../org/apache/hadoop/hbase/client/HTable.java | 18 +- .../hbase/client/ReversedClientScanner.java | 25 +- .../hbase/client/ReversedScannerCallable.java | 2 +- .../org/apache/hadoop/hbase/client/Scan.java | 68 ++ .../hadoop/hbase/client/ScannerCallable.java | 296 +++++---- .../client/ScannerCallableWithReplicas.java | 31 +- .../hadoop/hbase/protobuf/ProtobufUtil.java | 96 ++- .../hadoop/hbase/protobuf/RequestConverter.java | 8 +- .../hadoop/hbase/client/TestClientScanner.java | 112 ++-- .../client/TestClientSmallReversedScanner.java | 349 ---------- .../hbase/client/TestClientSmallScanner.java | 339 ---------- .../hbase/protobuf/generated/ClientProtos.java | 655 ++++++++++++++++--- hbase-protocol/src/main/protobuf/Client.proto | 11 + .../hadoop/hbase/mapreduce/SyncTable.java | 165 +++-- .../hbase/regionserver/RSRpcServices.java | 46 +- .../org/apache/hadoop/hbase/tool/Canary.java | 5 +- .../hadoop/hbase/HBaseTestingUtility.java | 25 +- .../hbase/TestMetaTableAccessorNoCluster.java | 2 +- .../hbase/TestPartialResultsFromClientSide.java | 12 +- .../client/TestClientScannerRPCTimeout.java | 5 + .../hadoop/hbase/client/TestFromClientSide.java | 2 +- .../hadoop/hbase/client/TestLeaseRenewal.java | 4 +- .../regionserver/TestRegionServerMetrics.java | 2 +- .../regionserver/TestScannerWithBulkload.java | 6 +- .../security/access/TestAccessController.java | 1 - .../security/access/TestAccessController2.java | 3 + 31 files changed, 1308 insertions(+), 2001 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index fb2bc4b..40b5002 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; @@ -27,6 +28,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -42,6 +44,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -57,9 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Private public class ClientScanner extends AbstractClientScanner { private static final Log LOG = LogFactory.getLog(ClientScanner.class); - // A byte array in which all elements are the max byte, and it is used to - // construct closest front row - static byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + protected Scan scan; protected boolean closed = false; // Current region scanner is against. Gets cleared if current region goes @@ -156,12 +157,6 @@ public class ClientScanner extends AbstractClientScanner { this.rpcControllerFactory = controllerFactory; this.conf = conf; - initializeScannerInConstruction(); - } - - protected void initializeScannerInConstruction() throws IOException { - // initialize the scanner - nextScanner(this.caching, false); } protected ClusterConnection getConnection() { @@ -242,31 +237,30 @@ public class ClientScanner extends AbstractClientScanner { this.callable = null; } } - /* + + /** * Gets a scanner for the next region. If this.currentRegion != null, then we will move to the - * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). We will go no - * further, just tidy up outstanding scanners, if <code>currentRegion != null</code> and - * <code>done</code> is true. - * @param nbRows - * @param done Server-side says we're done scanning. + * endrow of this.currentRegion. Else we will get scanner at the scan.getStartRow(). + * @param nbRows the caching option of the scan + * @return the results fetched when open scanner, or null which means terminate the scan. */ - protected boolean nextScanner(int nbRows, final boolean done) throws IOException { + protected Result[] nextScanner(int nbRows) throws IOException { // Close the previous scanner if it's open closeScanner(); // Where to start the next scanner byte[] localStartKey; - // if we're at end of table, close and return false to stop iterating + // if we're at end of table, close and return null to stop iterating if (this.currentRegion != null) { byte[] endKey = this.currentRegion.getEndKey(); - if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(endKey) || done) { + if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) || + checkScanStopRow(endKey)) { close(); if (LOG.isTraceEnabled()) { LOG.trace("Finished " + this.currentRegion); } - return false; + return null; } localStartKey = endKey; // clear mvcc read point if we are going to switch regions @@ -287,16 +281,23 @@ public class ClientScanner extends AbstractClientScanner { callable = getScannerCallable(localStartKey, nbRows); // Open a scanner on the region server starting at the // beginning of the region - call(callable, caller, scannerTimeout); + Result[] rrs = call(callable, caller, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } + if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { + // no results for the scan, return null to terminate the scan. + closed = true; + callable = null; + currentRegion = null; + return null; + } + return rrs; } catch (IOException e) { - close(); + closeScanner(); throw e; } - return true; } @VisibleForTesting @@ -304,9 +305,8 @@ public class ClientScanner extends AbstractClientScanner { return callable.isAnyRPCcancelled(); } - Result[] call(ScannerCallableWithReplicas callable, - RpcRetryingCaller<Result[]> caller, int scannerTimeout) - throws IOException, RuntimeException { + private Result[] call(ScannerCallableWithReplicas callable, RpcRetryingCaller<Result[]> caller, + int scannerTimeout) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } @@ -369,22 +369,19 @@ public class ClientScanner extends AbstractClientScanner { return cache != null ? cache.size() : 0; } + private boolean scanExhausted(Result[] values) { + // This means the server tells us the whole scan operation is done. Usually decided by filter or + // limit. + return values == null || callable.moreResultsForScan() == MoreResults.NO; + } + private boolean regionExhausted(Result[] values) { - // This means the server tells us the whole scan operation is done. Usually decided by filter. - if (values == null) { - return true; - } - // Not a heartbeat message and we get nothing, this means the region is exhausted - if (values.length == 0 && !callable.isHeartbeatMessage()) { - return true; - } - // Server tells us that it has no more results for this region. Notice that this flag is get - // from the ScanResponse.getMoreResultsInRegion, not ScanResponse.getMoreResults. If the latter - // one is false then we will get a null values and quit in the first condition of this method. - if (callable.hasMoreResultsContext() && !callable.getServerHasMoreResults()) { - return true; - } - return false; + // 1. Not a heartbeat message and we get nothing, this means the region is exhausted. And in the + // old time we always return empty result for a open scanner operation so we add a check here to + // keep compatible with the old logic. Should remove the isOpenScanner in the future. + // 2. Server tells us that it has no more results for this region. + return (values.length == 0 && !callable.isHeartbeatMessage() && !callable.isOpenScanner()) + || callable.moreResultsInRegion() == MoreResults.NO; } private void closeScannerIfExhausted(boolean exhausted) throws IOException { @@ -400,16 +397,87 @@ public class ClientScanner extends AbstractClientScanner { } } + private Result[] nextScannerWithRetries(int nbRows) throws IOException { + for (;;) { + try { + return nextScanner(nbRows); + } catch (DoNotRetryIOException e) { + handleScanError(e, null); + } + } + } + + private void handleScanError(DoNotRetryIOException e, + MutableBoolean retryAfterOutOfOrderException) throws DoNotRetryIOException { + // An exception was thrown which makes any partial results that we were collecting + // invalid. The scanner will need to be reset to the beginning of a row. + clearPartialResults(); + // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us + // to reset the scanner and come back in again. + + // If exception is any but the list below throw it back to the client; else setup + // the scanner and retry. + Throwable cause = e.getCause(); + if ((cause != null && cause instanceof NotServingRegionException) || + (cause != null && cause instanceof RegionServerStoppedException) || + e instanceof OutOfOrderScannerNextException || e instanceof UnknownScannerException || + e instanceof ScannerResetException) { + // Pass. It is easier writing the if loop test as list of what is allowed rather than + // as a list of what is not allowed... so if in here, it means we do not throw. + } else { + throw e; + } + + // Else, its signal from depths of ScannerCallable that we need to reset the scanner. + if (this.lastResult != null) { + // The region has moved. We need to open a brand new scanner at the new location. + // Reset the startRow to the row we've seen last so that the new scanner starts at + // the correct row. Otherwise we may see previously returned rows again. + // (ScannerCallable by now has "relocated" the correct region) + if (!this.lastResult.isPartial() && scan.getBatch() < 0) { + if (scan.isReversed()) { + scan.setStartRow(createClosestRowBefore(lastResult.getRow())); + } else { + scan.setStartRow(createClosestRowAfter(lastResult.getRow())); + } + } else { + // we need rescan this row because we only loaded partial row before + scan.setStartRow(lastResult.getRow()); + } + } + if (e instanceof OutOfOrderScannerNextException) { + if (retryAfterOutOfOrderException != null) { + if (retryAfterOutOfOrderException.isTrue()) { + retryAfterOutOfOrderException.setValue(false); + } else { + // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? + throw new DoNotRetryIOException( + "Failed after retry of OutOfOrderScannerNextException: was there a rpc timeout?", e); + } + } + } + // Clear region. + this.currentRegion = null; + // Set this to zero so we don't try and do an rpc and close on remote server when + // the exception we got was UnknownScanner or the Server is going down. + callable = null; + } + /** * Contact the servers to load more {@link Result}s in the cache. */ protected void loadCache() throws IOException { + // check if scanner was closed during previous prefetch + if (closed) { + return; + } Result[] values = null; long remainingResultSize = maxScannerResultSize; int countdown = this.caching; // This is possible if we just stopped at the boundary of a region in the previous call. if (callable == null) { - if (!nextScanner(countdown, false)) { + values = nextScannerWithRetries(countdown); + if (values == null) { return; } } @@ -417,80 +485,39 @@ public class ClientScanner extends AbstractClientScanner { callable.setCaching(this.caching); // This flag is set when we want to skip the result returned. We do // this when we reset scanner because it split under us. - boolean retryAfterOutOfOrderException = true; + MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true); for (;;) { try { // Server returns a null values if scanning is to stop. Else, // returns an empty array if scanning is to go on and we've just // exhausted current region. - values = call(callable, caller, scannerTimeout); + // now we will also fetch data when openScanner, so do not make a next call again if values + // is already non-null. + if (values == null) { + values = call(callable, caller, scannerTimeout); + } // When the replica switch happens, we need to do certain operations again. // The callable will openScanner with the right startkey but we need to pick up // from there. Bypass the rest of the loop and let the catch-up happen in the beginning // of the loop as it happens for the cases where we see exceptions. - // Since only openScanner would have happened, values would be null - if (values == null && callable.switchedToADifferentReplica()) { + if (callable.switchedToADifferentReplica()) { // Any accumulated partial results are no longer valid since the callable will // openScanner with the correct startkey and we must pick up from there clearPartialResults(); this.currentRegion = callable.getHRegionInfo(); - continue; - } - retryAfterOutOfOrderException = true; - } catch (DoNotRetryIOException | NeedUnmanagedConnectionException e) { - // An exception was thrown which makes any partial results that we were collecting - // invalid. The scanner will need to be reset to the beginning of a row. - clearPartialResults(); - // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us - // to reset the scanner and come back in again. - - // If exception is any but the list below throw it back to the client; else setup - // the scanner and retry. - Throwable cause = e.getCause(); - if ((cause != null && cause instanceof NotServingRegionException) || - (cause != null && cause instanceof RegionServerStoppedException) || - e instanceof OutOfOrderScannerNextException || - e instanceof UnknownScannerException || - e instanceof ScannerResetException) { - // Pass. It is easier writing the if loop test as list of what is allowed rather than - // as a list of what is not allowed... so if in here, it means we do not throw. - } else { - throw e; - } - - // Else, its signal from depths of ScannerCallable that we need to reset the scanner. - if (this.lastResult != null) { - // The region has moved. We need to open a brand new scanner at the new location. - // Reset the startRow to the row we've seen last so that the new scanner starts at - // the correct row. Otherwise we may see previously returned rows again. - // (ScannerCallable by now has "relocated" the correct region) - if (!this.lastResult.isPartial() && scan.getBatch() < 0) { - if (scan.isReversed()) { - scan.setStartRow(createClosestRowBefore(lastResult.getRow())); - } else { - scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1])); - } - } else { - // we need rescan this row because we only loaded partial row before - scan.setStartRow(lastResult.getRow()); - } - } - if (e instanceof OutOfOrderScannerNextException) { - if (retryAfterOutOfOrderException) { - retryAfterOutOfOrderException = false; - } else { - // TODO: Why wrap this in a DNRIOE when it already is a DNRIOE? - throw new DoNotRetryIOException("Failed after retry of " + - "OutOfOrderScannerNextException: was there a rpc timeout?", e); + // Now we will also fetch data when openScanner so usually we should not get a null + // result, but at some places we still use null to indicate the scan is terminated, so add + // a sanity check here. Should be removed later. + if (values == null) { + continue; } } - // Clear region. - this.currentRegion = null; - // Set this to zero so we don't try and do an rpc and close on remote server when - // the exception we got was UnknownScanner or the Server is going down. - callable = null; + retryAfterOutOfOrderException.setValue(true); + } catch (DoNotRetryIOException e) { + handleScanError(e, retryAfterOutOfOrderException); // reopen the scanner - if (!nextScanner(countdown, false)) { + values = nextScannerWithRetries(countdown); + if (values == null) { break; } continue; @@ -523,8 +550,18 @@ public class ClientScanner extends AbstractClientScanner { this.lastCellLoadedToCache = null; } } + if (scan.getLimit() > 0) { + int limit = scan.getLimit() - resultsToAddToCache.size(); + assert limit >= 0; + scan.setLimit(limit); + } } - boolean exhausted = regionExhausted(values); + if (scanExhausted(values)) { + closeScanner(); + closed = true; + break; + } + boolean regionExhausted = regionExhausted(values); if (callable.isHeartbeatMessage()) { if (!cache.isEmpty()) { // Caller of this method just wants a Result. If we see a heartbeat message, it means @@ -542,12 +579,12 @@ public class ClientScanner extends AbstractClientScanner { } if (countdown <= 0) { // we have enough result. - closeScannerIfExhausted(exhausted); + closeScannerIfExhausted(regionExhausted); break; } if (remainingResultSize <= 0) { if (!cache.isEmpty()) { - closeScannerIfExhausted(exhausted); + closeScannerIfExhausted(regionExhausted); break; } else { // we have reached the max result size but we still can not find anything to return to the @@ -556,17 +593,21 @@ public class ClientScanner extends AbstractClientScanner { } } // we are done with the current region - if (exhausted) { + if (regionExhausted) { if (!partialResults.isEmpty()) { // XXX: continue if there are partial results. But in fact server should not set // hasMoreResults to false if there are partial results. LOG.warn("Server tells us there is no more results for this region but we still have" + " partialResults, this should not happen, retry on the current scanner anyway"); + values = null; // reset values for the next call continue; } - if (!nextScanner(countdown, values == null)) { + values = nextScannerWithRetries(countdown); + if (values == null) { break; } + } else { + values = null; // reset values for the next call } } } @@ -738,46 +779,24 @@ public class ClientScanner extends AbstractClientScanner { } } - @Override - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - if (callable != null) { - callable.setClose(); - try { - call(callable, caller, scannerTimeout); - } catch (UnknownScannerException e) { - // We used to catch this error, interpret, and rethrow. However, we - // have since decided that it's not nice for a scanner's close to - // throw exceptions. Chances are it was just due to lease time out. - } catch (IOException e) { - /* An exception other than UnknownScanner is unexpected. */ - LOG.warn("scanner failed to close. Exception follows: " + e); - } - callable = null; + @Override + public void close() { + if (!scanMetricsPublished) writeScanMetrics(); + if (callable != null) { + callable.setClose(); + try { + call(callable, caller, scannerTimeout); + } catch (UnknownScannerException e) { + // We used to catch this error, interpret, and rethrow. However, we + // have since decided that it's not nice for a scanner's close to + // throw exceptions. Chances are it was just due to lease time out. + } catch (IOException e) { + /* An exception other than UnknownScanner is unexpected. */ + LOG.warn("scanner failed to close. Exception follows: " + e); } - closed = true; - } - - /** - * Create the closest row before the specified row - * @param row - * @return a new byte array which is the closest front row of the specified one - */ - protected static byte[] createClosestRowBefore(byte[] row) { - if (row == null) { - throw new IllegalArgumentException("The passed row is empty"); - } - if (Bytes.equals(row, HConstants.EMPTY_BYTE_ARRAY)) { - return MAX_BYTE_ARRAY; - } - if (row[row.length - 1] == 0) { - return Arrays.copyOf(row, row.length - 1); - } else { - byte[] closestFrontRow = Arrays.copyOf(row, row.length); - closestFrontRow[row.length - 1] = (byte) ((closestFrontRow[row.length - 1] & 0xff) - 1); - closestFrontRow = Bytes.add(closestFrontRow, MAX_BYTE_ARRAY); - return closestFrontRow; + callable = null; } + closed = true; } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java deleted file mode 100644 index bd5575a..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallReversedScanner.java +++ /dev/null @@ -1,345 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package org.apache.hadoop.hbase.client; - - -import com.google.protobuf.ServiceException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.ClientSmallScanner.SmallScannerCallable; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.annotations.VisibleForTesting; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutorService; - -/** - * <p> - * Client scanner for small reversed scan. Generally, only one RPC is called to fetch the - * scan results, unless the results cross multiple regions or the row count of - * results exceed the caching. - * </p> - * For small scan, it will get better performance than {@link ReversedClientScanner} - */ [email protected] -public class ClientSmallReversedScanner extends ReversedClientScanner { - private static final Log LOG = LogFactory.getLog(ClientSmallReversedScanner.class); - private ScannerCallableWithReplicas smallReversedScanCallable = null; - private SmallReversedScannerCallableFactory callableFactory; - - /** - * Create a new ReversibleClientScanner for the specified table. Take note that the passed - * {@link Scan} 's start row maybe changed changed. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @throws IOException - * If the remote call fails - */ - public ClientSmallReversedScanner(final Configuration conf, final Scan scan, - final TableName tableName, ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout, new SmallReversedScannerCallableFactory()); - } - - /** - * Create a new ReversibleClientScanner for the specified table. Take note that the passed - * {@link Scan}'s start row may be changed. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @param callableFactory - * Factory used to create the {@link SmallScannerCallable} - * @throws IOException - * If the remote call fails - */ - @VisibleForTesting - ClientSmallReversedScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - SmallReversedScannerCallableFactory callableFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); - this.callableFactory = callableFactory; - } - - /** - * Gets a scanner for following scan. Move to next region or continue from the last result or - * start from the start row. - * - * @param nbRows - * @param done - * true if Server-side says we're done scanning. - * @param currentRegionDone - * true if scan is over on current region - * @return true if has next scanner - * @throws IOException - */ - private boolean nextScanner(int nbRows, final boolean done, - boolean currentRegionDone) throws IOException { - // Where to start the next getter - byte[] localStartKey; - int cacheNum = nbRows; - boolean regionChanged = true; - boolean isFirstRegionToLocate = false; - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null && currentRegionDone) { - byte[] startKey = this.currentRegion.getStartKey(); - if (startKey == null - || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(startKey) || done) { - close(); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished with small scan at " + this.currentRegion); - } - return false; - } - // We take the row just under to get to the previous region. - localStartKey = createClosestRowBefore(startKey); - if (LOG.isDebugEnabled()) { - LOG.debug("Finished with region " + this.currentRegion); - } - } else if (this.lastResult != null) { - regionChanged = false; - localStartKey = createClosestRowBefore(lastResult.getRow()); - } else { - localStartKey = this.scan.getStartRow(); - isFirstRegionToLocate = true; - } - - if (!isFirstRegionToLocate - && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) { - // when non-firstRegion & localStartKey is empty bytes, no more rowKey should scan. - // otherwise, maybe infinity results with RowKey=0x00 will return. - return false; - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Advancing internal small scanner to startKey at '" - + Bytes.toStringBinary(localStartKey) + "'"); - } - - smallReversedScanCallable = - callableFactory.getCallable(getConnection(), getTable(), scan, getScanMetrics(), - localStartKey, cacheNum, rpcControllerFactory, getPool(), getPrimaryOperationTimeout(), - getRetries(), getScannerTimeout(), getConf(), caller, isFirstRegionToLocate); - - if (this.scanMetrics != null && regionChanged) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - return true; - } - - @Override - public Result next() throws IOException { - // If the scanner is closed and there's nothing left in the cache, next is a - // no-op. - if (cache.size() == 0 && this.closed) { - return null; - } - if (cache.size() == 0) { - loadCache(); - } - - if (cache.size() > 0) { - return cache.poll(); - } - // if we exhausted this scanner before calling close, write out the scan - // metrics - writeScanMetrics(); - return null; - } - - @Override - protected void loadCache() throws IOException { - Result[] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - boolean currentRegionDone = false; - // Values == null means server-side filter has determined we must STOP - while (remainingResultSize > 0 && countdown > 0 - && nextScanner(countdown, values == null, currentRegionDone)) { - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - values = this.caller.callWithoutRetries(smallReversedScanCallable, scannerTimeout); - this.currentRegion = smallReversedScanCallable.getHRegionInfo(); - long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null) { - this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - - lastNext); - } - lastNext = currentTime; - if (values != null && values.length > 0) { - for (int i = 0; i < values.length; i++) { - Result rs = values[i]; - cache.add(rs); - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - } - } - if (smallReversedScanCallable.hasMoreResultsContext()) { - currentRegionDone = !smallReversedScanCallable.getServerHasMoreResults(); - } else { - currentRegionDone = countdown > 0; - } - } - } - - @Override - protected void initializeScannerInConstruction() throws IOException { - // No need to initialize the scanner when constructing instance, do it when - // calling next(). Do nothing here. - } - - @Override - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - closed = true; - } - - @VisibleForTesting - protected void setScannerCallableFactory(SmallReversedScannerCallableFactory callableFactory) { - this.callableFactory = callableFactory; - } - - /** - * A reversed ScannerCallable which supports backward small scanning. - */ - static class SmallReversedScannerCallable extends ReversedScannerCallable { - - public SmallReversedScannerCallable(ClusterConnection connection, TableName table, Scan scan, - ScanMetrics scanMetrics, byte[] locateStartRow, RpcControllerFactory controllerFactory, - int caching, int replicaId) { - super(connection, table, scan, scanMetrics, locateStartRow, controllerFactory, replicaId); - this.setCaching(caching); - } - - @Override - public Result[] call(int timeout) throws IOException { - if (this.closed) return null; - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } - ClientProtos.ScanRequest request = RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), getScan(), getCaching(), true); - ClientProtos.ScanResponse response = null; - controller = controllerFactory.newController(); - try { - controller.setPriority(getTableName()); - controller.setCallTimeout(timeout); - response = getStub().scan(controller, request); - Result[] results = ResponseConverter.getResults(controller.cellScanner(), response); - if (response.hasMoreResultsInRegion()) { - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - setHasMoreResultsContext(false); - } - // We need to update result metrics since we are overriding call() - updateResultsMetrics(results); - return results; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - - @Override - public ScannerCallable getScannerCallableForReplica(int id) { - return new SmallReversedScannerCallable(getConnection(), getTableName(), getScan(), - scanMetrics, locateStartRow, controllerFactory, getCaching(), id); - } - } - - protected static class SmallReversedScannerCallableFactory { - - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller, - boolean isFirstRegionToLocate) { - byte[] locateStartRow = null; - if (isFirstRegionToLocate - && (localStartKey == null || Bytes.equals(localStartKey, HConstants.EMPTY_BYTE_ARRAY))) { - // HBASE-16886: if not setting startRow, then we will use a range [MAX_BYTE_ARRAY, +oo) to - // locate a region list, and the last one in region list is the region where our scan start. - locateStartRow = ClientScanner.MAX_BYTE_ARRAY; - } - - scan.setStartRow(localStartKey); - SmallReversedScannerCallable s = new SmallReversedScannerCallable(connection, table, scan, - scanMetrics, locateStartRow, controllerFactory, cacheNum, 0); - ScannerCallableWithReplicas scannerCallableWithReplicas = - new ScannerCallableWithReplicas(table, connection, s, pool, primaryOperationTimeout, scan, - retries, scannerTimeout, cacheNum, conf, caller); - return scannerCallableWithReplicas; - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java deleted file mode 100644 index b1554fd..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Copyright The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.concurrent.ExecutorService; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.ResponseConverter; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.common.annotations.VisibleForTesting; -import com.google.protobuf.ServiceException; - -/** - * Client scanner for small scan. Generally, only one RPC is called to fetch the - * scan results, unless the results cross multiple regions or the row count of - * results excess the caching. - * - * For small scan, it will get better performance than {@link ClientScanner} - */ [email protected] -public class ClientSmallScanner extends ClientScanner { - private static final Log LOG = LogFactory.getLog(ClientSmallScanner.class); - private ScannerCallableWithReplicas smallScanCallable = null; - private SmallScannerCallableFactory callableFactory; - - /** - * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan} - * 's start row maybe changed changed. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @throws IOException - * If the remote call fails - */ - public ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { - this(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout, new SmallScannerCallableFactory()); - } - - /** - * Create a new ShortClientScanner for the specified table. Take note that the passed {@link Scan} - * 's start row maybe changed changed. Intended for unit tests to provide their own - * {@link SmallScannerCallableFactory} implementation/mock. - * - * @param conf - * The {@link Configuration} to use. - * @param scan - * {@link Scan} to use in this scanner - * @param tableName - * The table that we wish to rangeGet - * @param connection - * Connection identifying the cluster - * @param rpcFactory - * Factory used to create the {@link RpcRetryingCaller} - * @param controllerFactory - * Factory used to access RPC payloads - * @param pool - * Threadpool for RPC threads - * @param primaryOperationTimeout - * Call timeout - * @param callableFactory - * Factory used to create the {@link SmallScannerCallable} - * @throws IOException - */ - @VisibleForTesting - ClientSmallScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - SmallScannerCallableFactory callableFactory) throws IOException { - super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); - this.callableFactory = callableFactory; - } - - @Override - protected void initializeScannerInConstruction() throws IOException { - // No need to initialize the scanner when constructing instance, do it when - // calling next(). Do nothing here. - } - - /** - * Gets a scanner for following scan. Move to next region or continue from the - * last result or start from the start row. - * @param nbRows - * @param done true if Server-side says we're done scanning. - * @param currentRegionDone true if scan is over on current region - * @return true if has next scanner - * @throws IOException - */ - private boolean nextScanner(int nbRows, final boolean done, - boolean currentRegionDone) throws IOException { - // Where to start the next getter - byte[] localStartKey; - int cacheNum = nbRows; - boolean regionChanged = true; - // if we're at end of table, close and return false to stop iterating - if (this.currentRegion != null && currentRegionDone) { - byte[] endKey = this.currentRegion.getEndKey(); - if (endKey == null || Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(endKey) || done) { - close(); - if (LOG.isTraceEnabled()) { - LOG.trace("Finished with small scan at " + this.currentRegion); - } - return false; - } - localStartKey = endKey; - if (LOG.isTraceEnabled()) { - LOG.trace("Finished with region " + this.currentRegion); - } - } else if (this.lastResult != null) { - regionChanged = false; - localStartKey = Bytes.add(lastResult.getRow(), new byte[1]); - } else { - localStartKey = this.scan.getStartRow(); - } - - if (LOG.isTraceEnabled()) { - LOG.trace("Advancing internal small scanner to startKey at '" - + Bytes.toStringBinary(localStartKey) + "'"); - } - smallScanCallable = callableFactory.getCallable(getConnection(), getTable(), scan, - getScanMetrics(), localStartKey, cacheNum, rpcControllerFactory, getPool(), - getPrimaryOperationTimeout(), getRetries(), getScannerTimeout(), getConf(), caller); - if (this.scanMetrics != null && regionChanged) { - this.scanMetrics.countOfRegions.incrementAndGet(); - } - return true; - } - - static class SmallScannerCallable extends ScannerCallable { - public SmallScannerCallable( - ClusterConnection connection, TableName table, Scan scan, - ScanMetrics scanMetrics, RpcControllerFactory controllerFactory, int caching, int id) { - super(connection, table, scan, scanMetrics, controllerFactory, id); - this.setCaching(caching); - } - - @Override - public Result[] call(int timeout) throws IOException { - if (this.closed) return null; - if (Thread.interrupted()) { - throw new InterruptedIOException(); - } - ScanRequest request = RequestConverter.buildScanRequest(getLocation() - .getRegionInfo().getRegionName(), getScan(), getCaching(), true); - ScanResponse response = null; - controller = controllerFactory.newController(); - try { - controller.setPriority(getTableName()); - controller.setCallTimeout(timeout); - response = getStub().scan(controller, request); - Result[] results = ResponseConverter.getResults(controller.cellScanner(), - response); - if (response.hasMoreResultsInRegion()) { - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - setHasMoreResultsContext(false); - } - // We need to update result metrics since we are overriding call() - updateResultsMetrics(results); - return results; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - } - - @Override - public ScannerCallable getScannerCallableForReplica(int id) { - return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), - scanMetrics, controllerFactory, getCaching(), id); - } - } - - @Override - public Result next() throws IOException { - // If the scanner is closed and there's nothing left in the cache, next is a - // no-op. - if (cache.size() == 0 && this.closed) { - return null; - } - if (cache.size() == 0) { - loadCache(); - } - - if (cache.size() > 0) { - return cache.poll(); - } - // if we exhausted this scanner before calling close, write out the scan - // metrics - writeScanMetrics(); - return null; - } - - @Override - protected void loadCache() throws IOException { - Result[] values = null; - long remainingResultSize = maxScannerResultSize; - int countdown = this.caching; - boolean currentRegionDone = false; - // Values == null means server-side filter has determined we must STOP - while (remainingResultSize > 0 && countdown > 0 - && nextScanner(countdown, values == null, currentRegionDone)) { - // Server returns a null values if scanning is to stop. Else, - // returns an empty array if scanning is to go on and we've just - // exhausted current region. - // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, - // we do a callWithRetries - values = this.caller.callWithoutRetries(smallScanCallable, scannerTimeout); - this.currentRegion = smallScanCallable.getHRegionInfo(); - long currentTime = System.currentTimeMillis(); - if (this.scanMetrics != null) { - this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - - lastNext); - } - lastNext = currentTime; - if (values != null && values.length > 0) { - for (int i = 0; i < values.length; i++) { - Result rs = values[i]; - cache.add(rs); - // We don't make Iterator here - for (Cell cell : rs.rawCells()) { - remainingResultSize -= CellUtil.estimatedHeapSizeOf(cell); - } - countdown--; - this.lastResult = rs; - } - } - if (smallScanCallable.hasMoreResultsContext()) { - // If the server has more results, the current region is not done - currentRegionDone = !smallScanCallable.getServerHasMoreResults(); - } else { - // not guaranteed to get the context in older versions, fall back to checking countdown - currentRegionDone = countdown > 0; - } - } - } - - public void close() { - if (!scanMetricsPublished) writeScanMetrics(); - closed = true; - } - - @VisibleForTesting - protected void setScannerCallableFactory(SmallScannerCallableFactory callableFactory) { - this.callableFactory = callableFactory; - } - - @InterfaceAudience.Private - protected static class SmallScannerCallableFactory { - - public ScannerCallableWithReplicas getCallable(ClusterConnection connection, TableName table, - Scan scan, ScanMetrics scanMetrics, byte[] localStartKey, int cacheNum, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout, - int retries, int scannerTimeout, Configuration conf, RpcRetryingCaller<Result[]> caller) { - scan.setStartRow(localStartKey); - SmallScannerCallable s = new SmallScannerCallable( - connection, table, scan, scanMetrics, controllerFactory, cacheNum, 0); - ScannerCallableWithReplicas scannerCallableWithReplicas = - new ScannerCallableWithReplicas(table, connection, - s, pool, primaryOperationTimeout, scan, retries, - scannerTimeout, cacheNum, conf, caller); - return scannerCallableWithReplicas; - } - - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 5f5badf..ab6cb8d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -1257,8 +1257,7 @@ class ConnectionManager { Scan s = new Scan(); s.setReversed(true); s.setStartRow(metaKey); - s.setSmall(true); - s.setCaching(1); + s.setOneRowLimit(); if (this.useMetaReplicas) { s.setConsistency(Consistency.TIMELINE); } @@ -1286,15 +1285,11 @@ class ConnectionManager { long pauseBase = this.pause; try { Result regionInfoRow = null; - ReversedClientScanner rcs = null; - try { - rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this, - rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0); + s.resetMvccReadPoint(); + try (ReversedClientScanner rcs = + new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, + rpcControllerFactory, getMetaLookupPool(), 0)) { regionInfoRow = rcs.next(); - } finally { - if (rcs != null) { - rcs.close(); - } } if (regionInfoRow == null) { http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index c1e6d23..e7b4114 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -17,14 +17,16 @@ */ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -32,8 +34,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; - -import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.util.Bytes; /** * Utility used by client connections. @@ -195,4 +196,36 @@ public class ConnectionUtils { return false; } } + + // A byte array in which all elements are the max byte, and it is used to + // construct closest front row + static final byte[] MAX_BYTE_ARRAY = Bytes.createMaxByteArray(9); + + /** + * Create the closest row after the specified row + */ + static byte[] createClosestRowAfter(byte[] row) { + return Arrays.copyOf(row, row.length + 1); + } + + /** + * Create the closest row before the specified row + * @deprecated in fact, we do not know the closest row before the given row, the result is only a + * row very close to the current row. Avoid using this method in the future. + */ + @Deprecated + static byte[] createClosestRowBefore(byte[] row) { + if (row.length == 0) { + return MAX_BYTE_ARRAY; + } + if (row[row.length - 1] == 0) { + return Arrays.copyOf(row, row.length - 1); + } else { + byte[] nextRow = new byte[row.length + MAX_BYTE_ARRAY.length]; + System.arraycopy(row, 0, nextRow, 0, row.length - 1); + nextRow[row.length - 1] = (byte) ((row[row.length - 1] & 0xFF) - 1); + System.arraycopy(MAX_BYTE_ARRAY, 0, nextRow, row.length, MAX_BYTE_ARRAY.length); + return nextRow; + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 83e7217..1e3a900 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -795,21 +795,9 @@ public class HTable implements HTableInterface, RegionLocator { } if (scan.isReversed()) { - if (scan.isSmall()) { - return new ClientSmallReversedScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); - } else { - return new ReversedClientScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); - } - } - - if (scan.isSmall()) { - return new ClientSmallScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ReversedClientScanner(getConfiguration(), scan, getName(), + this.connection, this.rpcCallerFactory, this.rpcControllerFactory, + pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); } else { return new ClientScanner(getConfiguration(), scan, getName(), this.connection, this.rpcCallerFactory, this.rpcControllerFactory, http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index ca998ae..edb66c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -18,15 +18,18 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; + import java.io.IOException; import java.util.concurrent.ExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ExceptionUtil; @@ -58,8 +61,7 @@ public class ReversedClientScanner extends ClientScanner { } @Override - protected boolean nextScanner(int nbRows, final boolean done) - throws IOException { + protected Result[] nextScanner(int nbRows) throws IOException { // Close the previous scanner if it's open closeScanner(); @@ -69,16 +71,17 @@ public class ReversedClientScanner extends ClientScanner { // if we're at start of table, close and return false to stop iterating if (this.currentRegion != null) { byte[] startKey = this.currentRegion.getStartKey(); - if (startKey == null - || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) - || checkScanStopRow(startKey) || done) { + if (startKey == null || Bytes.equals(startKey, HConstants.EMPTY_BYTE_ARRAY) + || checkScanStopRow(startKey)) { close(); if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } - return false; + return null; } localStartKey = startKey; + // clear mvcc read point if we are going to switch regions + scan.resetMvccReadPoint(); if (LOG.isDebugEnabled()) { LOG.debug("Finished " + this.currentRegion); } @@ -109,17 +112,21 @@ public class ReversedClientScanner extends ClientScanner { // beginning of the region // callWithoutRetries is at this layer. Within the ScannerCallableWithReplicas, // we do a callWithRetries - this.caller.callWithoutRetries(callable, scannerTimeout); + Result[] rrs = this.caller.callWithoutRetries(callable, scannerTimeout); this.currentRegion = callable.getHRegionInfo(); if (this.scanMetrics != null) { this.scanMetrics.countOfRegions.incrementAndGet(); } + if (rrs != null && rrs.length == 0 && callable.moreResultsForScan() == MoreResults.NO) { + // no results for the scan, return null to terminate the scan. + return null; + } + return rrs; } catch (IOException e) { ExceptionUtil.rethrowIfInterrupt(e); close(); throw e; } - return true; } protected ScannerCallableWithReplicas getScannerCallable(byte[] localStartKey, http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java index e169f7a..840af97 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedScannerCallable.java @@ -44,7 +44,7 @@ public class ReversedScannerCallable extends ScannerCallable { /** * The start row for locating regions. In reversed scanner, may locate the * regions for a range of keys when doing - * {@link ReversedClientScanner#nextScanner(int, boolean)} + * {@link ReversedClientScanner#nextScanner(int)} */ protected final byte[] locateStartRow; http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java index 128e7e1..84f1ca9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java @@ -175,6 +175,17 @@ public class Scan extends Query { private long mvccReadPoint = -1L; /** + * The number of rows we want for this scan. We will terminate the scan if the number of return + * rows reaches this value. + */ + private int limit = -1; + + /** + * Control whether to use pread at server side. + */ + private ReadType readType = ReadType.DEFAULT; + + /** * Create a Scan operation across all rows. */ public Scan() {} @@ -253,6 +264,7 @@ public class Scan extends Query { setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax()); } this.mvccReadPoint = scan.getMvccReadPoint(); + this.limit = scan.getLimit(); } /** @@ -1014,6 +1026,62 @@ public class Scan extends Query { } /** + * @return the limit of rows for this scan + */ + public int getLimit() { + return limit; + } + + /** + * Set the limit of rows for this scan. We will terminate the scan if the number of returned rows + * reaches this value. + * <p> + * This condition will be tested at last, after all other conditions such as stopRow, filter, etc. + * <p> + * Can not be used together with batch and allowPartial. + * @param limit the limit of rows for this scan + * @return this + */ + public Scan setLimit(int limit) { + this.limit = limit; + return this; + } + + /** + * Call this when you only want to get one row. It will set {@code limit} to {@code 1}, and also + * set {@code readType} to {@link ReadType#PREAD}. + * @return this + */ + public Scan setOneRowLimit() { + return setLimit(1).setReadType(ReadType.PREAD); + } + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public enum ReadType { + DEFAULT, STREAM, PREAD + } + + /** + * @return the read type for this scan + */ + public ReadType getReadType() { + return readType; + } + + /** + * Set the read type for this scan. + * <p> + * Notice that we may choose to use pread even if you specific {@link ReadType#STREAM} here. For + * example, we will always use pread if this is a get scan. + * @return this + */ + public Scan setReadType(ReadType readType) { + this.readType = readType; + return this; + } + + /** * Get the mvcc read point used to open a scanner. */ long getMvccReadPoint() { http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 2dee7ce..55be6da 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -18,6 +18,9 @@ package org.apache.hadoop.hbase.client; +import com.google.protobuf.ServiceException; +import com.google.protobuf.TextFormat; + import java.io.IOException; import java.io.InterruptedIOException; import java.net.UnknownHostException; @@ -26,10 +29,8 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; @@ -37,10 +38,10 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; @@ -51,12 +52,8 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; -import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.DNS; -import com.google.protobuf.ServiceException; -import com.google.protobuf.TextFormat; - /** * Scanner operations such as create, next, etc. * Used by {@link ResultScanner}s made by {@link HTable}. Passed to a retrying caller such as @@ -82,9 +79,15 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { private int logCutOffLatency = 1000; private static String myAddress; protected final int id; - protected boolean serverHasMoreResultsContext; - protected boolean serverHasMoreResults; + enum MoreResults { + YES, NO, UNKNOWN + } + + private MoreResults moreResultsInRegion; + private MoreResults moreResultsForScan; + + private boolean openScanner; /** * Saves whether or not the most recent response from the server was a heartbeat message. * Heartbeat messages are identified by the flag {@link ScanResponse#getHeartbeatMessage()} @@ -136,6 +139,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { logScannerActivity = conf.getBoolean(LOG_SCANNER_ACTIVITY, false); logCutOffLatency = conf.getInt(LOG_SCANNER_LATENCY_CUTOFF, 1000); this.controllerFactory = rpcControllerFactory; + this.controller = rpcControllerFactory.newController(); } PayloadCarryingRpcController getController() { @@ -189,135 +193,124 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { } } + private ScanResponse next() throws IOException { + // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server + setHeartbeatMessage(false); + incRPCcallsMetrics(); + ScanRequest request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, + this.scanMetrics != null, renew, scan.getLimit()); + try { + ScanResponse response = getStub().scan(controller, request); + nextCallSeq++; + return response; + } catch (Exception e) { + IOException ioe = ProtobufUtil.handleRemoteException(e); + if (logScannerActivity) { + LOG.info("Got exception making request " + TextFormat.shortDebugString(request) + " to " + + getLocation(), + e); + } + if (logScannerActivity) { + if (ioe instanceof UnknownScannerException) { + try { + HRegionLocation location = + getConnection().relocateRegion(getTableName(), scan.getStartRow()); + LOG.info("Scanner=" + scannerId + " expired, current region location is " + + location.toString()); + } catch (Throwable t) { + LOG.info("Failed to relocate region", t); + } + } else if (ioe instanceof ScannerResetException) { + LOG.info("Scanner=" + scannerId + " has received an exception, and the server " + + "asked us to reset the scanner state.", + ioe); + } + } + // The below convertion of exceptions into DoNotRetryExceptions is a little strange. + // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want + // ServerCallable#withRetries to just retry when it gets these exceptions. In here in + // a scan when doing a next in particular, we want to break out and get the scanner to + // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, + // yeah and hard to follow and in need of a refactor). + if (ioe instanceof NotServingRegionException) { + // Throw a DNRE so that we break out of cycle of calling NSRE + // when what we need is to open scanner against new location. + // Attach NSRE to signal client that it needs to re-setup scanner. + if (this.scanMetrics != null) { + this.scanMetrics.countOfNSRE.incrementAndGet(); + } + throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); + } else if (ioe instanceof RegionServerStoppedException) { + // Throw a DNRE so that we break out of cycle of the retries and instead go and + // open scanner against new location. + throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); + } else { + // The outer layers will retry + throw ioe; + } + } + } + + private void setAlreadyClosed() { + this.scannerId = -1L; + this.closed = true; + } @Override - public Result [] call(int callTimeout) throws IOException { + public Result[] call(int callTimeout) throws IOException { if (Thread.interrupted()) { throw new InterruptedIOException(); } - - if (controller == null) { - controller = controllerFactory.newController(); - controller.setPriority(getTableName()); - controller.setCallTimeout(callTimeout); - } - if (closed) { - if (scannerId != -1) { - close(); + close(); + return null; + } + controller.reset(); + controller.setPriority(getTableName()); + controller.setCallTimeout(callTimeout); + ScanResponse response; + if (this.scannerId == -1L) { + this.openScanner = true; + response = openScanner(); + } else { + this.openScanner = false; + response = next(); + } + long timestamp = System.currentTimeMillis(); + setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); + Result[] rrs = ResponseConverter.getResults(controller.cellScanner(), response); + if (logScannerActivity) { + long now = System.currentTimeMillis(); + if (now - timestamp > logCutOffLatency) { + int rows = rrs == null ? 0 : rrs.length; + LOG.info("Took " + (now - timestamp) + "ms to fetch " + rows + " rows from scanner=" + + scannerId); + } + } + updateServerSideMetrics(response); + // moreResults is only used for the case where a filter exhausts all elements + if (response.hasMoreResults()) { + if (response.getMoreResults()) { + setMoreResultsForScan(MoreResults.YES); + } else { + setMoreResultsForScan(MoreResults.NO); + setAlreadyClosed(); } } else { - if (scannerId == -1L) { - this.scannerId = openScanner(); + setMoreResultsForScan(MoreResults.UNKNOWN); + } + if (response.hasMoreResultsInRegion()) { + if (response.getMoreResultsInRegion()) { + setMoreResultsInRegion(MoreResults.YES); } else { - Result [] rrs = null; - ScanRequest request = null; - // Reset the heartbeat flag prior to each RPC in case an exception is thrown by the server - setHeartbeatMessage(false); - try { - incRPCcallsMetrics(); - request = - RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq, - this.scanMetrics != null, renew); - ScanResponse response = null; - try { - response = getStub().scan(controller, request); - // Client and RS maintain a nextCallSeq number during the scan. Every next() call - // from client to server will increment this number in both sides. Client passes this - // number along with the request and at RS side both the incoming nextCallSeq and its - // nextCallSeq will be matched. In case of a timeout this increment at the client side - // should not happen. If at the server side fetching of next batch of data was over, - // there will be mismatch in the nextCallSeq number. Server will throw - // OutOfOrderScannerNextException and then client will reopen the scanner with startrow - // as the last successfully retrieved row. - // See HBASE-5974 - nextCallSeq++; - long timestamp = System.currentTimeMillis(); - setHeartbeatMessage(response.hasHeartbeatMessage() && response.getHeartbeatMessage()); - // Results are returned via controller - CellScanner cellScanner = controller.cellScanner(); - rrs = ResponseConverter.getResults(cellScanner, response); - if (logScannerActivity) { - long now = System.currentTimeMillis(); - if (now - timestamp > logCutOffLatency) { - int rows = rrs == null ? 0 : rrs.length; - LOG.info("Took " + (now-timestamp) + "ms to fetch " - + rows + " rows from scanner=" + scannerId); - } - } - updateServerSideMetrics(response); - // moreResults is only used for the case where a filter exhausts all elements - if (response.hasMoreResults() && !response.getMoreResults()) { - scannerId = -1L; - closed = true; - // Implied that no results were returned back, either. - return null; - } - // moreResultsInRegion explicitly defines when a RS may choose to terminate a batch due - // to size or quantity of results in the response. - if (response.hasMoreResultsInRegion()) { - // Set what the RS said - setHasMoreResultsContext(true); - setServerHasMoreResults(response.getMoreResultsInRegion()); - } else { - // Server didn't respond whether it has more results or not. - setHasMoreResultsContext(false); - } - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); - } - updateResultsMetrics(rrs); - } catch (IOException e) { - if (logScannerActivity) { - LOG.info("Got exception making request " + TextFormat.shortDebugString(request) - + " to " + getLocation(), e); - } - IOException ioe = e; - if (e instanceof RemoteException) { - ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); - } - if (logScannerActivity) { - if (ioe instanceof UnknownScannerException) { - try { - HRegionLocation location = - getConnection().relocateRegion(getTableName(), scan.getStartRow()); - LOG.info("Scanner=" + scannerId - + " expired, current region location is " + location.toString()); - } catch (Throwable t) { - LOG.info("Failed to relocate region", t); - } - } else if (ioe instanceof ScannerResetException) { - LOG.info("Scanner=" + scannerId + " has received an exception, and the server " - + "asked us to reset the scanner state.", ioe); - } - } - // The below convertion of exceptions into DoNotRetryExceptions is a little strange. - // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want - // ServerCallable#withRetries to just retry when it gets these exceptions. In here in - // a scan when doing a next in particular, we want to break out and get the scanner to - // reset itself up again. Throwing a DNRIOE is how we signal this to happen (its ugly, - // yeah and hard to follow and in need of a refactor). - if (ioe instanceof NotServingRegionException) { - // Throw a DNRE so that we break out of cycle of calling NSRE - // when what we need is to open scanner against new location. - // Attach NSRE to signal client that it needs to re-setup scanner. - if (this.scanMetrics != null) { - this.scanMetrics.countOfNSRE.incrementAndGet(); - } - throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); - } else if (ioe instanceof RegionServerStoppedException) { - // Throw a DNRE so that we break out of cycle of the retries and instead go and - // open scanner against new location. - throw new DoNotRetryIOException("Resetting the scanner -- see exception cause", ioe); - } else { - // The outer layers will retry - throw ioe; - } - } - return rrs; + setMoreResultsInRegion(MoreResults.NO); + setAlreadyClosed(); } + } else { + setMoreResultsInRegion(MoreResults.UNKNOWN); } - return null; + updateResultsMetrics(rrs); + return rrs; } /** @@ -326,11 +319,11 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { * scan request exceeds a certain time threshold. Heartbeats allow the server to avoid * timeouts during long running scan operations. */ - protected boolean isHeartbeatMessage() { + boolean isHeartbeatMessage() { return heartbeatMessage; } - protected void setHeartbeatMessage(boolean heartbeatMessage) { + private void setHeartbeatMessage(boolean heartbeatMessage) { this.heartbeatMessage = heartbeatMessage; } @@ -397,12 +390,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { this.scannerId = -1L; } - protected long openScanner() throws IOException { + private ScanResponse openScanner() throws IOException { incRPCcallsMetrics(); - ScanRequest request = - RequestConverter.buildScanRequest( - getLocation().getRegionInfo().getRegionName(), - this.scan, 0, false); + ScanRequest request = RequestConverter.buildScanRequest( + getLocation().getRegionInfo().getRegionName(), this.scan, this.caching, false); try { ScanResponse response = getStub().scan(controller, request); long id = response.getScannerId(); @@ -413,9 +404,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { if (response.hasMvccReadPoint()) { this.scan.setMvccReadPoint(response.getMvccReadPoint()); } - return id; - } catch (ServiceException se) { - throw ProtobufUtil.getRemoteException(se); + this.scannerId = id; + return response; + } catch (Exception e) { + throw ProtobufUtil.handleRemoteException(e); } } @@ -480,27 +472,31 @@ public class ScannerCallable extends RegionServerCallable<Result[]> { /** * Should the client attempt to fetch more results from this region - * @return True if the client should attempt to fetch more results, false otherwise. */ - protected boolean getServerHasMoreResults() { - assert serverHasMoreResultsContext; - return this.serverHasMoreResults; + MoreResults moreResultsInRegion() { + return moreResultsInRegion; } - protected void setServerHasMoreResults(boolean serverHasMoreResults) { - this.serverHasMoreResults = serverHasMoreResults; + void setMoreResultsInRegion(MoreResults moreResults) { + this.moreResultsInRegion = moreResults; } /** - * Did the server respond with information about whether more results might exist. - * Not guaranteed to respond with older server versions - * @return True if the server responded with information about more results. + * Should the client attempt to fetch more results for the whole scan. */ - protected boolean hasMoreResultsContext() { - return serverHasMoreResultsContext; + MoreResults moreResultsForScan() { + return moreResultsForScan; + } + + void setMoreResultsForScan(MoreResults moreResults) { + this.moreResultsForScan = moreResults; } - protected void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { - this.serverHasMoreResultsContext = serverHasMoreResultsContext; + /** + * Whether the previous call is openScanner. This is used to keep compatible with the old + * implementation that we always returns empty result for openScanner. + */ + boolean isOpenScanner() { + return openScanner; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/4456d228/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index a030e67..46c8f9c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -18,7 +18,10 @@ package org.apache.hadoop.hbase.client; -import static org.apache.hadoop.hbase.client.ClientScanner.createClosestRowBefore; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter; +import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore; + +import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; @@ -30,22 +33,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults; import org.apache.hadoop.hbase.util.Pair; -import com.google.common.annotations.VisibleForTesting; - /** * This class has the logic for handling scanners for regions with and without replicas. * 1. A scan is attempted on the default (primary) region @@ -115,20 +114,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { return currentScannerCallable.getHRegionInfo(); } - public boolean getServerHasMoreResults() { - return currentScannerCallable.getServerHasMoreResults(); - } - - public void setServerHasMoreResults(boolean serverHasMoreResults) { - currentScannerCallable.setServerHasMoreResults(serverHasMoreResults); + public MoreResults moreResultsInRegion() { + return currentScannerCallable.moreResultsInRegion(); } - public boolean hasMoreResultsContext() { - return currentScannerCallable.hasMoreResultsContext(); + public MoreResults moreResultsForScan() { + return currentScannerCallable.moreResultsForScan(); } - public void setHasMoreResultsContext(boolean serverHasMoreResultsContext) { - currentScannerCallable.setHasMoreResultsContext(serverHasMoreResultsContext); + public boolean isOpenScanner() { + return currentScannerCallable.isOpenScanner(); } @Override @@ -342,7 +337,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { if (callable.getScan().isReversed()) { callable.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow())); } else { - callable.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new byte[1])); + callable.getScan().setStartRow(createClosestRowAfter(this.lastResult.getRow())); } } }
