Repository: hbase
Updated Branches:
  refs/heads/branch-1 2ff16e532 -> 26749c347


HBASE-12761 On region jump ClientScanners should get next row start key instead 
of a skip.

Signed-off-by: stack <[email protected]>

Conflicts:
        
hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26749c34
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26749c34
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26749c34

Branch: refs/heads/branch-1
Commit: 26749c347d5ca34844743f16f57a36d99646a81f
Parents: 2ff16e5
Author: Jurriaan Mous <[email protected]>
Authored: Sun Dec 28 13:53:03 2014 +0100
Committer: stack <[email protected]>
Committed: Tue Dec 30 14:49:58 2014 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      | 66 ++++++--------------
 .../hbase/client/ReversedClientScanner.java     |  5 +-
 .../client/ScannerCallableWithReplicas.java     | 11 +++-
 3 files changed, 33 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/26749c34/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 2ac5652..b4e515f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -45,6 +45,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import static 
org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;
+
 /**
  * Implements the scanner interface for the HBase client.
  * If there are multiple regions in a table, this scanner will iterate
@@ -92,7 +94,8 @@ public class ClientScanner extends AbstractClientScanner {
    */
   public ClientScanner(final Configuration conf, final Scan scan, final 
TableName tableName,
       ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int 
primaryOperationTimeout) throws IOException {
+      RpcControllerFactory controllerFactory, ExecutorService pool, int 
primaryOperationTimeout)
+      throws IOException {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Scan table=" + tableName
             + ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@@ -229,7 +232,7 @@ public class ClientScanner extends AbstractClientScanner {
       // Close the previous scanner if it's open
       if (this.callable != null) {
         this.callable.setClose();
-        call(scan, callable, caller, scannerTimeout);
+        call(callable, caller, scannerTimeout);
         this.callable = null;
       }
 
@@ -266,7 +269,7 @@ public class ClientScanner extends AbstractClientScanner {
         callable = getScannerCallable(localStartKey, nbRows);
         // Open a scanner on the region server starting at the
         // beginning of the region
-        call(scan, callable, caller, scannerTimeout);
+        call(callable, caller, scannerTimeout);
         this.currentRegion = callable.getHRegionInfo();
         if (this.scanMetrics != null) {
           this.scanMetrics.countOfRegions.incrementAndGet();
@@ -283,7 +286,7 @@ public class ClientScanner extends AbstractClientScanner {
     return callable.isAnyRPCcancelled();
   }
 
-  static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
+  static Result[] call(ScannerCallableWithReplicas callable,
       RpcRetryingCaller<Result[]> caller, int scannerTimeout)
       throws IOException, RuntimeException {
     if (Thread.interrupted()) {
@@ -310,12 +313,12 @@ public class ClientScanner extends AbstractClientScanner {
 
     /**
      * Publish the scan metrics. For now, we use scan.setAttribute to pass the 
metrics back to the
-     * application or TableInputFormat.Later, we could push it to other 
systems. We don't use metrics
-     * framework because it doesn't support multi-instances of the same 
metrics on the same machine;
-     * for scan/map reduce scenarios, we will have multiple scans running at 
the same time.
+     * application or TableInputFormat.Later, we could push it to other 
systems. We don't use
+     * metrics framework because it doesn't support multi-instances of the 
same metrics on the same
+     * machine; for scan/map reduce scenarios, we will have multiple scans 
running at the same time.
      *
-     * By default, scan metrics are disabled; if the application wants to 
collect them, this behavior
-     * can be turned on by calling calling:
+     * By default, scan metrics are disabled; if the application wants to 
collect them, this
+     * behavior can be turned on by calling calling:
      *
      * scan.setAttribute(SCAN_ATTRIBUTES_METRICS_ENABLE, 
Bytes.toBytes(Boolean.TRUE))
      */
@@ -343,39 +346,13 @@ public class ClientScanner extends AbstractClientScanner {
         callable.setCaching(this.caching);
         // This flag is set when we want to skip the result returned.  We do
         // this when we reset scanner because it split under us.
-        boolean skipFirst = false;
         boolean retryAfterOutOfOrderException  = true;
         do {
           try {
-            if (skipFirst) {
-              // Skip only the first row (which was the last row of the last
-              // already-processed batch).
-              callable.setCaching(1);
-              values = call(scan, callable, caller, scannerTimeout);
-              // When the replica switch happens, we need to do certain 
operations
-              // again. The scannercallable will openScanner with the right 
startkey
-              // but we need to pick up from there. Bypass the rest of the loop
-              // and let the catch-up happen in the beginning of the loop as it
-              // happens for the cases where we see exceptions. Since only 
openScanner
-              // would have happened, values would be null
-              if (values == null && callable.switchedToADifferentReplica()) {
-                if (this.lastResult != null) { //only skip if there was 
something read earlier
-                  skipFirst = true;
-                }
-                this.currentRegion = callable.getHRegionInfo();
-                continue;
-              }
-              callable.setCaching(this.caching);
-              skipFirst = false;
-            }
             // Server returns a null values if scanning is to stop.  Else,
             // returns an empty array if scanning is to go on and we've just
             // exhausted current region.
-            values = call(scan, callable, caller, scannerTimeout);
-            if (skipFirst && values != null && values.length == 1) {
-              skipFirst = false; // Already skipped, unset it before scanning 
again
-              values = call(scan, callable, caller, scannerTimeout);
-            }
+            values = call(callable, caller, scannerTimeout);
             // When the replica switch happens, we need to do certain 
operations
             // again. The callable will openScanner with the right startkey
             // but we need to pick up from there. Bypass the rest of the loop
@@ -383,9 +360,6 @@ public class ClientScanner extends AbstractClientScanner {
             // happens for the cases where we see exceptions. Since only 
openScanner
             // would have happened, values would be null
             if (values == null && callable.switchedToADifferentReplica()) {
-              if (this.lastResult != null) { //only skip if there was 
something read earlier
-                skipFirst = true;
-              }
               this.currentRegion = callable.getHRegionInfo();
               continue;
             }
@@ -428,11 +402,11 @@ public class ClientScanner extends AbstractClientScanner {
               // scanner starts at the correct row. Otherwise we may see 
previously
               // returned rows again.
               // (ScannerCallable by now has "relocated" the correct region)
-              this.scan.setStartRow(this.lastResult.getRow());
-
-              // Skip first row returned.  We already let it out on previous
-              // invocation.
-              skipFirst = true;
+              if(scan.isReversed()){
+                scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
+              }else {
+                scan.setStartRow(Bytes.add(lastResult.getRow(), new byte[1]));
+              }
             }
             if (e instanceof OutOfOrderScannerNextException) {
               if (retryAfterOutOfOrderException) {
@@ -452,7 +426,7 @@ public class ClientScanner extends AbstractClientScanner {
             continue;
           }
           long currentTime = System.currentTimeMillis();
-          if (this.scanMetrics != null ) {
+          if (this.scanMetrics != null) {
             
this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime-lastNext);
           }
           lastNext = currentTime;
@@ -487,7 +461,7 @@ public class ClientScanner extends AbstractClientScanner {
       if (callable != null) {
         callable.setClose();
         try {
-          call(scan, callable, caller, scannerTimeout);
+          call(callable, caller, scannerTimeout);
         } catch (UnknownScannerException e) {
            // We used to catch this error, interpret, and rethrow. However, we
            // have since decided that it's not nice for a scanner's close to

http://git-wip-us.apache.org/repos/asf/hbase/blob/26749c34/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index a03858e..8681e19 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -57,7 +57,8 @@ public class ReversedClientScanner extends ClientScanner {
       TableName tableName, ClusterConnection connection,
       RpcRetryingCallerFactory rpcFactory, RpcControllerFactory 
controllerFactory,
       ExecutorService pool, int primaryOperationTimeout) throws IOException {
-    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, 
pool, primaryOperationTimeout);
+    super(conf, scan, tableName, connection, rpcFactory, controllerFactory, 
pool,
+        primaryOperationTimeout);
   }
 
   @Override
@@ -166,7 +167,7 @@ public class ReversedClientScanner extends ClientScanner {
    * @param row
    * @return a new byte array which is the closest front row of the specified 
one
    */
-  protected byte[] createClosestRowBefore(byte[] row) {
+  protected static byte[] createClosestRowBefore(byte[] row) {
     if (row == null) {
       throw new IllegalArgumentException("The passed row is empty");
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26749c34/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 8b6fc5c..23a3ff5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -39,9 +39,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import static 
org.apache.hadoop.hbase.client.ReversedClientScanner.createClosestRowBefore;
+
 /**
  * This class has the logic for handling scanners for regions with and without 
replicas.
  * 1. A scan is attempted on the default (primary) region
@@ -272,8 +276,13 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
         continue; //this was already scheduled earlier
       }
       ScannerCallable s = 
currentScannerCallable.getScannerCallableForReplica(id);
+
       if (this.lastResult != null) {
-        s.getScan().setStartRow(this.lastResult.getRow());
+        if(s.getScan().isReversed()){
+          
s.getScan().setStartRow(createClosestRowBefore(this.lastResult.getRow()));
+        }else {
+          s.getScan().setStartRow(Bytes.add(this.lastResult.getRow(), new 
byte[1]));
+        }
       }
       outstandingCallables.add(s);
       RetryingRPC retryingOnReplica = new RetryingRPC(s);

Reply via email to