HBASE-16604 Scanner retries on IOException can cause the scans to miss data

Conflicts:
        
hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java

Conflicts:
        
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9d424c20
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9d424c20
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9d424c20

Branch: refs/heads/branch-1.1
Commit: 9d424c20c1caabcabf874a3d4f6b774e83886b57
Parents: 97ce640
Author: Enis Soztutar <e...@apache.org>
Authored: Thu Sep 22 17:41:01 2016 -0700
Committer: Enis Soztutar <e...@apache.org>
Committed: Thu Sep 22 18:44:19 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/UnknownScannerException.java   |   4 +
 .../hadoop/hbase/client/ClientScanner.java      |   4 +-
 .../hadoop/hbase/client/ScannerCallable.java    |  26 +++--
 .../hbase/exceptions/ScannerResetException.java |  50 +++++++++
 .../hbase/ipc/MetricsHBaseServerSource.java     |   2 +
 .../hbase/ipc/MetricsHBaseServerSourceImpl.java |   8 ++
 .../hadoop/hbase/client/VersionInfoUtil.java    |  63 +++++++++++
 .../hadoop/hbase/ipc/MetricsHBaseServer.java    |   3 +
 .../hbase/regionserver/RSRpcServices.java       |  65 +++++++----
 .../hadoop/hbase/HBaseTestingUtility.java       |  18 ++-
 .../hadoop/hbase/client/TestFromClientSide.java |  77 ++++++++++++-
 .../hbase/client/TestTableSnapshotScanner.java  |   2 +-
 .../TableSnapshotInputFormatTestBase.java       |   2 +-
 .../mapreduce/TestMultithreadedTableMapper.java |   3 +-
 .../hbase/mapreduce/TestTableMapReduce.java     |   4 +-
 .../hbase/mapreduce/TestTableMapReduceBase.java |   2 +-
 .../regionserver/DelegatingKeyValueScanner.java | 109 +++++++++++++++++++
 17 files changed, 402 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
index b951221..3e7b22d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java
@@ -45,4 +45,8 @@ public class UnknownScannerException extends 
DoNotRetryIOException {
   public UnknownScannerException(String s) {
     super(s);
   }
+
+  public UnknownScannerException(String s, Exception e) {
+    super(s, e);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index c504d9b..4a5e635 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@@ -433,7 +434,8 @@ public class ClientScanner extends AbstractClientScanner {
         if ((cause != null && cause instanceof NotServingRegionException) ||
             (cause != null && cause instanceof RegionServerStoppedException) ||
             e instanceof OutOfOrderScannerNextException ||
-            e instanceof UnknownScannerException ) {
+            e instanceof UnknownScannerException ||
+            e instanceof ScannerResetException) {
           // Pass. It is easier writing the if loop test as list of what is 
allowed rather than
           // as a list of what is not allowed... so if in here, it means we do 
not throw.
         } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index fc21efa..0a22f37 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -104,9 +105,9 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
    * @param connection which connection
    * @param tableName table callable is on
    * @param scan the scan to execute
-   * @param scanMetrics the ScanMetrics to used, if it is null, 
+   * @param scanMetrics the ScanMetrics to used, if it is null,
    *        ScannerCallable won't collect metrics
-   * @param rpcControllerFactory factory to use when creating 
+   * @param rpcControllerFactory factory to use when creating
    *        {@link com.google.protobuf.RpcController}
    */
   public ScannerCallable (ClusterConnection connection, TableName tableName, 
Scan scan,
@@ -265,14 +266,19 @@ public class ScannerCallable extends 
RegionServerCallable<Result[]> {
           if (e instanceof RemoteException) {
             ioe = 
RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
           }
-          if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
-            try {
-              HRegionLocation location =
-                getConnection().relocateRegion(getTableName(), 
scan.getStartRow());
-              LOG.info("Scanner=" + scannerId
-                + " expired, current region location is " + 
location.toString());
-            } catch (Throwable t) {
-              LOG.info("Failed to relocate region", t);
+          if (logScannerActivity) {
+            if (ioe instanceof UnknownScannerException) {
+              try {
+                HRegionLocation location =
+                    getConnection().relocateRegion(getTableName(), 
scan.getStartRow());
+                LOG.info("Scanner=" + scannerId
+                  + " expired, current region location is " + 
location.toString());
+              } catch (Throwable t) {
+                LOG.info("Failed to relocate region", t);
+              }
+            } else if (ioe instanceof ScannerResetException) {
+              LOG.info("Scanner=" + scannerId + " has received an exception, 
and the server "
+                  + "asked us to reset the scanner state.", ioe);
             }
           }
           // The below convertion of exceptions into DoNotRetryExceptions is a 
little strange.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
new file mode 100644
index 0000000..7689eb1
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.exceptions;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Thrown when the server side has received an Exception, and asks the Client 
to reset the scanner
+ * state by closing the current region scanner, and reopening from the start 
of last seen row.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class ScannerResetException extends DoNotRetryIOException {
+  private static final long serialVersionUID = -5649728171144849619L;
+
+  /** constructor */
+  public ScannerResetException() {
+    super();
+  }
+
+  /**
+   * Constructor
+   * @param s message
+   */
+  public ScannerResetException(String s) {
+    super(s);
+  }
+
+  public ScannerResetException(String s, Exception e) {
+    super(s, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
index 482fdba..6676d93 100644
--- 
a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
+++ 
b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java
@@ -64,6 +64,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
   String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException";
   String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException";
   String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException";
+  String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException";
   String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
   String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
   String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
@@ -86,6 +87,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
   void movedRegionException();
   void notServingRegionException();
   void unknownScannerException();
+  void scannerResetException();
   void tooBusyException();
 
   void sentBytes(long count);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
index 18226df..676cf36 100644
--- 
a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
+++ 
b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java
@@ -43,6 +43,7 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
   private final MutableCounterLong exceptionsOOO;
   private final MutableCounterLong exceptionsBusy;
   private final MutableCounterLong exceptionsUnknown;
+  private final MutableCounterLong exceptionsScannerReset;
   private final MutableCounterLong exceptionsSanity;
   private final MutableCounterLong exceptionsNSRE;
   private final MutableCounterLong exceptionsMoved;
@@ -72,6 +73,8 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
         .newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
     this.exceptionsUnknown = this.getMetricsRegistry()
         .newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L);
+    this.exceptionsScannerReset = this.getMetricsRegistry()
+        .newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L);
     this.exceptionsSanity = this.getMetricsRegistry()
         .newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
     this.exceptionsMoved = this.getMetricsRegistry()
@@ -141,6 +144,11 @@ public class MetricsHBaseServerSourceImpl extends 
BaseSourceImpl
   }
 
   @Override
+  public void scannerResetException() {
+    exceptionsScannerReset.incr();
+  }
+
+  @Override
   public void tooBusyException() {
     exceptionsBusy.incr();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
new file mode 100644
index 0000000..f11fd9e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/VersionInfoUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.RpcCallContext;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+
+
+/**
+ * Class to help with parsing the version info.
+ */
+@InterfaceAudience.Private
+public final class VersionInfoUtil {
+
+  private VersionInfoUtil() {
+    /* UTIL CLASS ONLY */
+  }
+
+  public static boolean currentClientHasMinimumVersion(int major, int minor) {
+    RpcCallContext call = RpcServer.getCurrentCall();
+    RPCProtos.VersionInfo versionInfo = call != null ? 
call.getClientVersionInfo() : null;
+    return hasMinimumVersion(versionInfo, major, minor);
+  }
+
+  public static boolean hasMinimumVersion(RPCProtos.VersionInfo versionInfo,
+                                          int major,
+                                          int minor) {
+    if (versionInfo != null) {
+      try {
+        String[] components = versionInfo.getVersion().split("\\.");
+
+        int clientMajor = components.length > 0 ? 
Integer.parseInt(components[0]) : 0;
+        if (clientMajor != major) {
+          return clientMajor > major;
+        }
+
+        int clientMinor = components.length > 1 ? 
Integer.parseInt(components[1]) : 0;
+        return clientMinor >= minor;
+      } catch (NumberFormatException e) {
+        return false;
+      }
+    }
+    return false;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
index 3ca50ad..cdd607f 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 
 @InterfaceAudience.Private
 public class MetricsHBaseServer {
@@ -91,6 +92,8 @@ public class MetricsHBaseServer {
         source.tooBusyException();
       } else if (throwable instanceof UnknownScannerException) {
         source.unknownScannerException();
+      } else if (throwable instanceof ScannerResetException) {
+        source.scannerResetException();
       } else if (throwable instanceof RegionMovedException) {
         source.movedRegionException();
       } else if (throwable instanceof NotServingRegionException) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 3217b98..5b087a1 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -65,12 +65,14 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.VersionInfoUtil;
 import org.apache.hadoop.hbase.coordination.CloseRegionCoordination;
 import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.MergeRegionException;
 import org.apache.hadoop.hbase.exceptions.OperationConflictException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -2517,13 +2519,23 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
             addResults(builder, results, controller, 
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
           }
         } catch (IOException e) {
-          // if we have an exception on scanner next and we are using the 
callSeq
-          // we should rollback because the client will retry with the same 
callSeq
-          // and get an OutOfOrderScannerNextException if we don't do so.
-          if (rsh != null && request.hasNextCallSeq()) {
-            rsh.rollbackNextCallSeq();
+          // The scanner state might be left in a dirty state, so we will tell 
the Client to
+          // fail this RPC and close the scanner while opening up another one 
from the start of
+          // row that the client has last seen.
+          closeScanner(region, scanner, scannerName);
+
+          // We closed the scanner already. Instead of throwing the 
IOException, and client
+          // retrying with the same scannerId only to get USE on the next RPC, 
we directly throw
+          // a special exception to save an RPC.
+          RpcCallContext context = RpcServer.getCurrentCall();
+          if 
(VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
+            // 1.4.0+ clients know how to handle
+            throw new ScannerResetException("Scanner is closed on the 
server-side", e);
+          } else {
+            // older clients do not know about SRE. Just throw USE, which they 
will handle
+            throw new UnknownScannerException("Throwing 
UnknownScannerException to reset the client"
+                + " scanner state for clients older than 1.3.", e);
           }
-          throw e;
         } finally {
           // We're done. On way out re-add the above removed lease.
           // Adding resets expiration time on lease.
@@ -2537,20 +2549,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
       if (!moreResults || closeScanner) {
         ttl = 0;
         moreResults = false;
-        if (region != null && region.getCoprocessorHost() != null) {
-          if (region.getCoprocessorHost().preScannerClose(scanner)) {
-            return builder.build(); // bypass
-          }
-        }
-        rsh = scanners.remove(scannerName);
-        if (rsh != null) {
-          scanner = rsh.s;
-          scanner.close();
-          regionServer.leases.cancelLease(scannerName);
-          if (region != null && region.getCoprocessorHost() != null) {
-            region.getCoprocessorHost().postScannerClose(scanner);
-          }
-        }
+        closeScanner(region, scanner, scannerName);
       }
 
       if (ttl > 0) {
@@ -2581,6 +2580,32 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     }
   }
 
+  private boolean closeScanner(Region region, RegionScanner scanner, String 
scannerName)
+      throws IOException {
+    if (region != null && region.getCoprocessorHost() != null) {
+      if (region.getCoprocessorHost().preScannerClose(scanner)) {
+        return true; // bypass
+      }
+    }
+    RegionScannerHolder rsh = scanners.remove(scannerName);
+    if (rsh != null) {
+      scanner = rsh.s;
+      scanner.close();
+      try {
+        regionServer.leases.cancelLease(scannerName);
+      } catch (LeaseException le) {
+        // No problem, ignore
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Un-able to cancel lease of scanner. It could already be 
closed.");
+        }
+      }
+      if (region != null && region.getCoprocessorHost() != null) {
+        region.getCoprocessorHost().postScannerClose(scanner);
+      }
+    }
+    return false;
+  }
+
   @Override
   public CoprocessorServiceResponse execRegionServerService(RpcController 
controller,
       CoprocessorServiceRequest request) throws ServiceException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 44ec24a..b06e489 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -1953,6 +1953,22 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
     return HRegion.createHRegion(info, rootDir, conf, htd);
   }
 
+  public HTableDescriptor createTableDescriptor(final TableName tableName,
+      byte[] family) {
+    return createTableDescriptor(tableName, new byte[][] {family}, 1);
+  }
+
+  public HTableDescriptor createTableDescriptor(final TableName tableName,
+      byte[][] families, int maxVersions) {
+    HTableDescriptor desc = new HTableDescriptor(tableName);
+    for (byte[] family : families) {
+      HColumnDescriptor hcd = new HColumnDescriptor(family)
+          .setMaxVersions(maxVersions);
+      desc.addFamily(hcd);
+    }
+    return desc;
+  }
+
   /**
    * Create an HRegion that writes to the local tmp dirs
    * @param desc
@@ -2164,7 +2180,7 @@ public class HBaseTestingUtility extends 
HBaseCommonTestingUtility {
       Put put = new Put(row);
       put.setDurability(writeToWAL ? Durability.USE_DEFAULT : 
Durability.SKIP_WAL);
       for (int i = 0; i < f.length; i++) {
-        put.add(f[i], null, value != null ? value : row);
+        put.add(f[i], f[i], value != null ? value : row);
       }
       puts.add(put);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 5aba902..aa8936c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -37,13 +36,14 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.NavigableSet;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.log4j.Level;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
@@ -65,8 +65,11 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@@ -92,11 +95,14 @@ import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
 import 
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
 import 
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
 import 
org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
-import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
 import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -588,6 +594,71 @@ public class TestFromClientSide {
     assertEquals(rowCount - endKeyCount, countGreater);
   }
 
+  /**
+   * This is a coprocessor to inject a test failure so that a store 
scanner.reseek() call will
+   * fail with an IOException() on the first call.
+   */
+  public static class ExceptionInReseekRegionObserver extends 
BaseRegionObserver {
+    static AtomicLong reqCount = new AtomicLong(0);
+    class MyStoreScanner extends StoreScanner {
+      public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, 
NavigableSet<byte[]> columns,
+          long readPt) throws IOException {
+        super(store, scanInfo, scan, columns, readPt);
+      }
+
+      @Override
+      protected List<KeyValueScanner> selectScannersFrom(
+          List<? extends KeyValueScanner> allScanners) {
+        List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
+        List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
+        for (KeyValueScanner scanner : scanners) {
+          newScanners.add(new DelegatingKeyValueScanner(scanner) {
+            @Override
+            public boolean reseek(Cell key) throws IOException {
+              if (reqCount.incrementAndGet() == 1) {
+                throw new IOException("Injected exception");
+              }
+              return super.reseek(key);
+            }
+          });
+        }
+        return newScanners;
+      }
+    }
+
+    @Override
+    public KeyValueScanner 
preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+        Store store, Scan scan, NavigableSet<byte[]> targetCols, 
KeyValueScanner s)
+            throws IOException {
+      return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, 
Long.MAX_VALUE);
+    }
+  }
+
+  /**
+   * Tests the case where a Scan can throw an IOException in the middle of the 
seek / reseek
+   * leaving the server side RegionScanner to be in dirty state. The client 
has to ensure that the
+   * ClientScanner does not get an exception and also sees all the data.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testClientScannerIsResetWhenScanThrowsIOException()
+  throws IOException, InterruptedException {
+    
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", 
true);
+    TableName name = 
TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException");
+
+    HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
+    htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
+    TEST_UTIL.getHBaseAdmin().createTable(htd);
+    try (Table t = TEST_UTIL.getConnection().getTable(name)) {
+      int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
+      TEST_UTIL.getHBaseAdmin().flush(name);
+      int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY));
+      assertEquals(rowCount, actualRowCount);
+    }
+    assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
+  }
+
   /*
    * @param key
    * @return Scan with RowFilter that does LESS than passed key.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 3e915e1..0e2b670 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -183,7 +183,7 @@ public class TestTableSnapshotScanner {
     }
 
     for (int j = 0; j < FAMILIES.length; j++) {
-      byte[] actual = result.getValue(FAMILIES[j], null);
+      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
       Assert.assertArrayEquals("Row in snapshot does not match, expected:" + 
Bytes.toString(row)
           + " ,actual:" + Bytes.toString(actual), row, actual);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
index b35a5d3..190c245 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java
@@ -180,7 +180,7 @@ public abstract class TableSnapshotInputFormatTestBase {
     }
 
     for (int j = 0; j < FAMILIES.length; j++) {
-      byte[] actual = result.getValue(FAMILIES[j], null);
+      byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
       Assert.assertArrayEquals("Row in snapshot does not match, expected:" + 
Bytes.toString(row)
         + " ,actual:" + Bytes.toString(actual), row, actual);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
index 7ce4a63..fe3d9d3 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java
@@ -98,6 +98,7 @@ public class TestMultithreadedTableMapper {
      * @param context
      * @throws IOException
      */
+    @Override
     public void map(ImmutableBytesWritable key, Result value,
         Context context)
             throws IOException, InterruptedException {
@@ -111,7 +112,7 @@ public class TestMultithreadedTableMapper {
             Bytes.toString(INPUT_FAMILY) + "'.");
       }
       // Get the original value and reverse it
-      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, 
null));
+      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, 
INPUT_FAMILY));
       StringBuilder newValue = new StringBuilder(originalValue);
       newValue.reverse();
       // Now set the value to be collected

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
index 6fb9460..fde1cd9 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java
@@ -51,6 +51,7 @@ import org.junit.experimental.categories.Category;
 public class TestTableMapReduce extends TestTableMapReduceBase {
   private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
 
+  @Override
   protected Log getLog() { return LOG; }
 
   /**
@@ -66,6 +67,7 @@ public class TestTableMapReduce extends 
TestTableMapReduceBase {
      * @param context
      * @throws IOException
      */
+    @Override
     public void map(ImmutableBytesWritable key, Result value,
       Context context)
     throws IOException, InterruptedException {
@@ -80,7 +82,7 @@ public class TestTableMapReduce extends 
TestTableMapReduceBase {
       }
 
       // Get the original value and reverse it
-      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, 
null));
+      String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, 
INPUT_FAMILY));
       StringBuilder newValue = new StringBuilder(originalValue);
       newValue.reverse();
       // Now set the value to be collected

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
index 084b80f..1e2efb6 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java
@@ -122,7 +122,7 @@ public abstract class TestTableMapReduceBase {
 
     // Get the original value and reverse it
 
-    String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
+    String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, 
INPUT_FAMILY));
     StringBuilder newValue = new StringBuilder(originalValue);
     newValue.reverse();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9d424c20/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
new file mode 100644
index 0000000..c680654
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+public class DelegatingKeyValueScanner implements KeyValueScanner {
+  protected KeyValueScanner delegate;
+
+  public DelegatingKeyValueScanner(KeyValueScanner delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public Cell peek() {
+    return delegate.peek();
+  }
+
+  @Override
+  public Cell next() throws IOException {
+    return delegate.next();
+  }
+
+  @Override
+  public boolean seek(Cell key) throws IOException {
+    return delegate.seek(key);
+  }
+
+  @Override
+  public boolean reseek(Cell key) throws IOException {
+    return delegate.reseek(key);
+  }
+
+  @Override
+  public void close() {
+    delegate.close();
+  }
+
+  @Override
+  public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long 
oldestUnexpiredTS) {
+    return delegate.shouldUseScanner(scan, columns, oldestUnexpiredTS);
+  }
+
+  @Override
+  public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) 
throws IOException {
+    return delegate.requestSeek(kv, forward, useBloom);
+  }
+
+  @Override
+  public long getSequenceID() {
+    return delegate.getSequenceID();
+  }
+
+  @Override
+  public boolean realSeekDone() {
+    return delegate.realSeekDone();
+  }
+
+  @Override
+  public void enforceSeek() throws IOException {
+    delegate.enforceSeek();
+  }
+
+  @Override
+  public boolean isFileScanner() {
+    return delegate.isFileScanner();
+  }
+
+  @Override
+  public boolean backwardSeek(Cell key) throws IOException {
+    return delegate.backwardSeek(key);
+  }
+
+  @Override
+  public boolean seekToPreviousRow(Cell key) throws IOException {
+    return delegate.seekToPreviousRow(key);
+  }
+
+  @Override
+  public boolean seekToLastRow() throws IOException {
+    return delegate.seekToLastRow();
+  }
+
+  @Override
+  public Cell getNextIndexedKey() {
+    return delegate.getNextIndexedKey();
+  }
+}

Reply via email to