This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch hubspot-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 7023542cce0b40b2ec2a3eb1c421501858085afb
Author: Bryan Beaudreault <bbeaudrea...@hubspot.com>
AuthorDate: Thu Jul 7 16:33:16 2022 -0400

    HubSpot Backport: HBASE-27078 Allow configuring a separate timeout for meta 
scans (#4585)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    Signed-off-by: Andrew Purtell <apurt...@apache.org>
---
 .../hbase/client/AsyncConnectionConfiguration.java |  36 +-
 .../hadoop/hbase/client/AsyncTableBuilderBase.java |   7 +-
 .../hbase/client/ClientAsyncPrefetchScanner.java   |   8 +-
 .../apache/hadoop/hbase/client/ClientScanner.java  |  17 +-
 .../hadoop/hbase/client/ClientSimpleScanner.java   |   8 +-
 .../hbase/client/ConnectionConfiguration.java      |  31 ++
 .../hbase/client/ConnectionImplementation.java     |   3 +-
 .../org/apache/hadoop/hbase/client/HTable.java     |  31 +-
 .../client/ResultBoundedCompletionService.java     |  21 +-
 .../hadoop/hbase/client/ReversedClientScanner.java |   8 +-
 .../client/RpcRetryingCallerWithReadReplicas.java  |   2 +-
 .../hbase/client/ScannerCallableWithReplicas.java  |  14 +-
 .../hadoop/hbase/client/TableBuilderBase.java      |   6 +
 .../hadoop/hbase/client/TestClientScanner.java     |   6 +-
 .../hbase/client/TestClientScannerTimeouts.java    | 488 +++++++++++++++++++++
 .../regionserver/TestScannerHeartbeatMessages.java |  15 +-
 16 files changed, 634 insertions(+), 67 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 69616649d57..ca16d19ba26 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -23,6 +23,9 @@ import static 
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PE
 import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
 import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
+
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -56,6 +59,9 @@ class AsyncConnectionConfiguration {
   // timeout for each read rpc request
   private final long readRpcTimeoutNs;
 
+  // timeout for each read rpc request against system tables
+  private final long metaReadRpcTimeoutNs;
+
   // timeout for each write rpc request
   private final long writeRpcTimeoutNs;
 
@@ -73,6 +79,7 @@ class AsyncConnectionConfiguration {
   // client that it is still alive. The scan timeout is used as operation 
timeout for every
   // operations in a scan, such as openScanner or next.
   private final long scanTimeoutNs;
+  private final long metaScanTimeoutNs;
 
   private final int scannerCaching;
 
@@ -110,12 +117,13 @@ class AsyncConnectionConfiguration {
       connectionConf.getMetaOperationTimeout());
     this.operationTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout());
     this.rpcTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout());
-    this.readRpcTimeoutNs =
-      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY,
-        connectionConf.getReadRpcTimeout()));
-    this.writeRpcTimeoutNs =
-      TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
-        connectionConf.getWriteRpcTimeout()));
+    long readRpcTimeoutMillis =
+      conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getRpcTimeout());
+    this.readRpcTimeoutNs = 
TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis);
+    this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS
+      .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, 
readRpcTimeoutMillis));
+    this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS
+      .toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, 
connectionConf.getWriteRpcTimeout()));
     this.pauseNs = 
TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis());
     this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos(
       connectionConf.getPauseMillisForServerOverloaded());
@@ -125,9 +133,11 @@ class AsyncConnectionConfiguration {
       connectionConf.getReplicaCallTimeoutMicroSecondScan());
     this.primaryMetaScanTimeoutNs =
       
TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan());
-    this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
-      conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
-        DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD));
+    long scannerTimeoutMillis = 
conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+    this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis);
+    this.metaScanTimeoutNs = TimeUnit.MILLISECONDS
+      .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, 
scannerTimeoutMillis));
 
     // fields not in connection configuration
     this.startLogErrorsCnt =
@@ -152,6 +162,10 @@ class AsyncConnectionConfiguration {
     return readRpcTimeoutNs;
   }
 
+  long getMetaReadRpcTimeoutNs() {
+    return metaReadRpcTimeoutNs;
+  }
+
   long getWriteRpcTimeoutNs() {
     return writeRpcTimeoutNs;
   }
@@ -176,6 +190,10 @@ class AsyncConnectionConfiguration {
     return scanTimeoutNs;
   }
 
+  long getMetaScanTimeoutNs() {
+    return metaScanTimeoutNs;
+  }
+
   int getScannerCaching() {
     return scannerCaching;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
index bec9f123690..00daeafb37f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -55,9 +55,12 @@ abstract class AsyncTableBuilderBase<C extends 
ScanResultConsumerBase>
     this.tableName = tableName;
     this.operationTimeoutNs = tableName.isSystemTable() ? 
connConf.getMetaOperationTimeoutNs()
       : connConf.getOperationTimeoutNs();
-    this.scanTimeoutNs = connConf.getScanTimeoutNs();
+    this.scanTimeoutNs =
+      tableName.isSystemTable() ? connConf.getMetaScanTimeoutNs() : 
connConf.getScanTimeoutNs();
     this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
-    this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
+    this.readRpcTimeoutNs = tableName.isSystemTable()
+      ? connConf.getMetaReadRpcTimeoutNs()
+      : connConf.getReadRpcTimeoutNs();
     this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
     this.pauseNs = connConf.getPauseNs();
     this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index 0c832acdb37..378fe8d2855 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -60,11 +60,11 @@ public class ClientAsyncPrefetchScanner extends 
ClientSimpleScanner {
   private final Condition notFull = lock.newCondition();
 
   public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, 
TableName name,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
-      RpcControllerFactory rpcControllerFactory, ExecutorService pool,
-      int replicaCallTimeoutMicroSecondScan) throws IOException {
+    ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
+    RpcControllerFactory rpcControllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
+    int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws 
IOException {
     super(configuration, scan, name, connection, rpcCallerFactory, 
rpcControllerFactory, pool,
-        replicaCallTimeoutMicroSecondScan);
+      scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
     exceptionsQueue = new ConcurrentLinkedQueue<>();
     Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + 
".asyncPrefetcher");
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index c799e5b3872..f29ce95181c 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -70,6 +70,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
   protected final long maxScannerResultSize;
   private final ClusterConnection connection;
   protected final TableName tableName;
+  protected final int readRpcTimeout;
   protected final int scannerTimeout;
   protected boolean scanMetricsPublished = false;
   protected RpcRetryingCaller<Result[]> caller;
@@ -94,9 +95,9 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
    * @throws IOException
    */
   public ClientScanner(final Configuration conf, final Scan scan, final 
TableName tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int 
primaryOperationTimeout)
-      throws IOException {
+    ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
+    RpcControllerFactory controllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
+    int scannerTimeout, int primaryOperationTimeout) throws IOException {
     if (LOG.isTraceEnabled()) {
       LOG.trace(
         "Scan table=" + tableName + ", startRow=" + 
Bytes.toStringBinary(scan.getStartRow()));
@@ -115,8 +116,8 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
       this.maxScannerResultSize = 
conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
     }
-    this.scannerTimeout = 
conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
-        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+    this.readRpcTimeout = scanReadRpcTimeout;
+    this.scannerTimeout = scannerTimeout;
 
     // check if application wants to collect scan metrics
     initScanMetrics(scan);
@@ -243,9 +244,9 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     // clear the current region, we will set a new value to it after the first 
call of the new
     // callable.
     this.currentRegion = null;
-    this.callable =
-        new ScannerCallableWithReplicas(getTable(), getConnection(), 
createScannerCallable(), pool,
-            primaryOperationTimeout, scan, getRetries(), scannerTimeout, 
caching, conf, caller);
+    this.callable = new ScannerCallableWithReplicas(getTable(), 
getConnection(),
+      createScannerCallable(), pool, primaryOperationTimeout, scan, 
getRetries(), readRpcTimeout,
+      scannerTimeout, caching, conf, caller);
     this.callable.setCaching(this.caching);
     incRegionCountMetrics(scanMetrics);
     return true;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
index 2211f8696ef..33b31213f94 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
@@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 @InterfaceAudience.Private
 public class ClientSimpleScanner extends ClientScanner {
   public ClientSimpleScanner(Configuration configuration, Scan scan, TableName 
name,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
-      RpcControllerFactory rpcControllerFactory, ExecutorService pool,
-      int replicaCallTimeoutMicroSecondScan) throws IOException {
+    ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
+    RpcControllerFactory rpcControllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
+    int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws 
IOException {
     super(configuration, scan, name, connection, rpcCallerFactory, 
rpcControllerFactory, pool,
-        replicaCallTimeoutMicroSecondScan);
+      scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan);
   }
 
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index eee2139796f..ae1e06567d5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -66,6 +66,11 @@ public class ConnectionConfiguration {
       HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, 
HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED);
   }
 
+  public static final String HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY =
+    "hbase.client.meta.read.rpc.timeout";
+  public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
+    "hbase.client.meta.scanner.timeout.period";
+
   private final long writeBufferSize;
   private final long writeBufferPeriodicFlushTimeoutMs;
   private final long writeBufferPeriodicFlushTimerTickMs;
@@ -80,7 +85,11 @@ public class ConnectionConfiguration {
   private final int maxKeyValueSize;
   private final int rpcTimeout;
   private final int readRpcTimeout;
+  private final int metaReadRpcTimeout;
   private final int writeRpcTimeout;
+  private final int scanTimeout;
+  private final int metaScanTimeout;
+
   // toggle for async/sync prefetch
   private final boolean clientScannerAsyncPrefetch;
   private final long pauseMs;
@@ -138,10 +147,17 @@ public class ConnectionConfiguration {
     this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
         conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
 
+    this.metaReadRpcTimeout = 
conf.getInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeout);
+
     this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
         conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
 
 
+    this.scanTimeout = 
conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+      HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
+
+    this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, 
scanTimeout);
+
     long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, 
DEFAULT_HBASE_CLIENT_PAUSE);
     long pauseMsForServerOverloaded = 
conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
       conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs));
@@ -178,8 +194,11 @@ public class ConnectionConfiguration {
     this.clientScannerAsyncPrefetch = 
Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH;
     this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
     this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+    this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
     this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
     this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+    this.scanTimeout = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD;
+    this.metaScanTimeout = scanTimeout;
     this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
     this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
   }
@@ -188,6 +207,10 @@ public class ConnectionConfiguration {
     return readRpcTimeout;
   }
 
+  public int getMetaReadRpcTimeout() {
+    return metaReadRpcTimeout;
+  }
+
   public int getWriteRpcTimeout() {
     return writeRpcTimeout;
   }
@@ -248,6 +271,14 @@ public class ConnectionConfiguration {
     return rpcTimeout;
   }
 
+  public int getScanTimeout() {
+    return scanTimeout;
+  }
+
+  public int getMetaScanTimeout() {
+    return metaScanTimeout;
+  }
+
   public long getPauseMillis() {
     return pauseMs;
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index e5ee051776a..33797e763a2 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -934,7 +934,8 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
         s.resetMvccReadPoint();
         try (ReversedClientScanner rcs =
           new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, 
rpcCallerFactory,
-            rpcControllerFactory, getMetaLookupPool(), 
metaReplicaCallTimeoutScanInMicroSecond)) {
+            rpcControllerFactory, getMetaLookupPool(), 
connectionConfig.getMetaReadRpcTimeout(),
+            connectionConfig.getMetaScanTimeout(), 
metaReplicaCallTimeoutScanInMicroSecond)) {
           boolean tableNotFound = true;
           for (;;) {
             Result regionInfoRow = rcs.next();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a04fd26df65..5cb5f3bc1aa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -114,6 +114,9 @@ public class HTable implements Table {
   private final int rpcTimeoutMs; // FIXME we should use this for rpc like 
batch and checkAndXXX
   private int readRpcTimeoutMs; // timeout for each read rpc request
   private int writeRpcTimeoutMs; // timeout for each write rpc request
+
+  private final int scanReadRpcTimeout;
+  private final int scanTimeout;
   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
   private final HRegionLocator locator;
 
@@ -187,6 +190,8 @@ public class HTable implements Table {
     this.rpcTimeoutMs = builder.rpcTimeout;
     this.readRpcTimeoutMs = builder.readRpcTimeout;
     this.writeRpcTimeoutMs = builder.writeRpcTimeout;
+    this.scanReadRpcTimeout = builder.scanReadRpcTimeout;
+    this.scanTimeout = builder.scanTimeout;
     this.scannerCaching = connConfiguration.getScannerCaching();
     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
 
@@ -306,24 +311,24 @@ public class HTable implements Table {
       // it is not supposed to be set by user, clear
       scan.resetMvccReadPoint();
     }
-    Boolean async = scan.isAsyncPrefetch();
-    if (async == null) {
-      async = connConfiguration.isClientScannerAsyncPrefetch();
-    }
+    final boolean async = scan.isAsyncPrefetch() != null
+      ? scan.isAsyncPrefetch()
+      : connConfiguration.isClientScannerAsyncPrefetch();
+    final int replicaTimeout = 
connConfiguration.getReplicaCallTimeoutMicroSecondScan();
 
     if (scan.isReversed()) {
-      return new ReversedClientScanner(getConfiguration(), scan, getName(),
-        this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
-        pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
+      return new ReversedClientScanner(getConfiguration(), scan, getName(), 
connection,
+        rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 
scanTimeout,
+        replicaTimeout);
     } else {
       if (async) {
-        return new ClientAsyncPrefetchScanner(getConfiguration(), scan, 
getName(), this.connection,
-            this.rpcCallerFactory, this.rpcControllerFactory,
-            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
+        return new ClientAsyncPrefetchScanner(getConfiguration(), scan, 
getName(), connection,
+          rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 
scanTimeout,
+          replicaTimeout);
       } else {
-        return new ClientSimpleScanner(getConfiguration(), scan, getName(), 
this.connection,
-            this.rpcCallerFactory, this.rpcControllerFactory,
-            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
+        return new ClientSimpleScanner(getConfiguration(), scan, getName(), 
connection,
+          rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, 
scanTimeout,
+          replicaTimeout);
       }
     }
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
index 965b13c2134..0e798c62d20 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -1,4 +1,4 @@
-/**
+ /**
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -59,16 +59,17 @@ public class ResultBoundedCompletionService<V> {
     private T result = null;
     private ExecutionException exeEx = null;
     private volatile boolean cancelled = false;
-    private final int callTimeout;
+    private final int operationTimeout;
     private final RpcRetryingCaller<T> retryingCaller;
     private boolean resultObtained = false;
     private final int replicaId;  // replica id
 
 
-    public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) 
{
+    public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int 
operationTimeout,
+      int id) {
       this.future = future;
-      this.callTimeout = callTimeout;
-      this.retryingCaller = retryingCallerFactory.<T>newCaller();
+      this.operationTimeout = operationTimeout;
+      this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout);
       this.replicaId = id;
     }
 
@@ -77,7 +78,7 @@ public class ResultBoundedCompletionService<V> {
     public void run() {
       try {
         if (!cancelled) {
-          result = this.retryingCaller.callWithRetries(future, callTimeout);
+          result = this.retryingCaller.callWithRetries(future, 
operationTimeout);
           resultObtained = true;
         }
       } catch (Throwable t) {
@@ -165,10 +166,10 @@ public class ResultBoundedCompletionService<V> {
     this.completedTasks = new ArrayList<>(maxTasks);
   }
 
-
-  public void submit(RetryingCallable<V> task, int callTimeout, int id) {
-    QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id);
-    executor.execute(TraceUtil.wrap(newFuture, 
"ResultBoundedCompletionService.submit"));
+  public void submit(RetryingCallable<V> task, int rpcTimeout, int 
operationTimeout, int id) {
+    QueueingFuture<V> newFuture = new QueueingFuture<>(task, rpcTimeout, 
operationTimeout, id);
+    // remove trace for runnable because HBASE-25373 and OpenTelemetry do not 
cover TraceRunnable
+    executor.execute(newFuture);
     tasks[id] = newFuture;
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index c0bf4b8d263..b10395f1bff 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -39,11 +39,11 @@ public class ReversedClientScanner extends ClientScanner {
    * {@link Scan}'s start row maybe changed.
    */
   public ReversedClientScanner(Configuration conf, Scan scan, TableName 
tableName,
-      ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
-      RpcControllerFactory controllerFactory, ExecutorService pool, int 
primaryOperationTimeout)
-      throws IOException {
+    ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
+    RpcControllerFactory controllerFactory, ExecutorService pool, int 
scanReadRpcTimeout,
+    int scannerTimeout, int primaryOperationTimeout) throws IOException {
     super(conf, scan, tableName, connection, rpcFactory, controllerFactory, 
pool,
-        primaryOperationTimeout);
+      scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout);
   }
 
   @Override
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 260cb8c26f2..674f96f5a9a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -314,7 +314,7 @@ public class RpcRetryingCallerWithReadReplicas {
     for (int id = min; id <= max; id++) {
       HRegionLocation hrl = rl.getRegionLocation(id);
       ReplicaRegionServerCallable callOnReplica = new 
ReplicaRegionServerCallable(id, hrl);
-      cs.submit(callOnReplica, operationTimeout, id);
+      cs.submit(callOnReplica, rpcTimeout, operationTimeout, id);
     }
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 636ca374d3b..888e9ccb5e9 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -68,15 +68,16 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
   private final RpcRetryingCaller<Result[]> caller;
   private final TableName tableName;
   private Configuration conf;
-  private int scannerTimeout;
+  private final int scannerTimeout;
+  private final int readRpcTimeout;
   private Set<ScannerCallable> outstandingCallables = new HashSet<>();
   private boolean someRPCcancelled = false; //required for testing purposes 
only
   private int regionReplication = 0;
 
   public ScannerCallableWithReplicas(TableName tableName, ClusterConnection 
cConnection,
-      ScannerCallable baseCallable, ExecutorService pool, int 
timeBeforeReplicas, Scan scan,
-      int retries, int scannerTimeout, int caching, Configuration conf,
-      RpcRetryingCaller<Result []> caller) {
+    ScannerCallable baseCallable, ExecutorService pool, int 
timeBeforeReplicas, Scan scan,
+    int retries, int readRpcTimeout, int scannerTimeout, int caching, 
Configuration conf,
+    RpcRetryingCaller<Result[]> caller) {
     this.currentScannerCallable = baseCallable;
     this.cConnection = cConnection;
     this.pool = pool;
@@ -88,6 +89,7 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
     this.retries = retries;
     this.tableName = tableName;
     this.conf = conf;
+    this.readRpcTimeout = readRpcTimeout;
     this.scannerTimeout = scannerTimeout;
     this.caller = caller;
   }
@@ -326,7 +328,7 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
       ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) {
     RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
     outstandingCallables.add(currentScannerCallable);
-    cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
+    cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, 
currentScannerCallable.id);
   }
 
   private void addCallsForOtherReplicas(
@@ -340,7 +342,7 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
       setStartRowForReplicaCallable(s);
       outstandingCallables.add(s);
       RetryingRPC retryingOnReplica = new RetryingRPC(s);
-      cs.submit(retryingOnReplica, scannerTimeout, id);
+      cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
     }
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
index fa543c06244..585b859a606 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java
@@ -35,6 +35,8 @@ abstract class TableBuilderBase implements TableBuilder {
   protected int readRpcTimeout;
 
   protected int writeRpcTimeout;
+  protected final int scanReadRpcTimeout;
+  protected int scanTimeout;
 
   TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) {
     if (tableName == null) {
@@ -45,6 +47,10 @@ abstract class TableBuilderBase implements TableBuilder {
         : connConf.getOperationTimeout();
     this.rpcTimeout = connConf.getRpcTimeout();
     this.readRpcTimeout = connConf.getReadRpcTimeout();
+    this.scanReadRpcTimeout =
+      tableName.isSystemTable() ? connConf.getMetaReadRpcTimeout() : 
readRpcTimeout;
+    this.scanTimeout =
+      tableName.isSystemTable() ? connConf.getMetaScanTimeout() : 
connConf.getScanTimeout();
     this.writeRpcTimeout = connConf.getWriteRpcTimeout();
   }
 
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index 244abe01139..4d51f6f789a 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.RegionLocations;
@@ -107,7 +108,8 @@ public class TestClientScanner {
         RpcControllerFactory controllerFactory, ExecutorService pool, int 
primaryOperationTimeout)
         throws IOException {
       super(conf, scan, tableName, connection, rpcFactory, controllerFactory, 
pool,
-          primaryOperationTimeout);
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
primaryOperationTimeout);
     }
 
     @Override
@@ -495,7 +497,7 @@ public class TestClientScanner {
     }
 
     @Override
-    public <T> RpcRetryingCaller<T> newCaller() {
+    public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) {
       return new RpcRetryingCaller<T>() {
         @Override
         public void cancel() {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
new file mode 100644
index 00000000000..2bff2297ff5
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
+import static 
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
+import org.apache.hadoop.hbase.ipc.CallTimeoutException;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestClientScannerTimeouts {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestClientScannerTimeouts.class);
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestClientScannerTimeouts.class);
+  private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static AsyncConnection ASYNC_CONN;
+  private static Connection CONN;
+  private static final byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static final byte[] VALUE = Bytes.toBytes("testValue");
+
+  private static final byte[] ROW0 = Bytes.toBytes("row-0");
+  private static final byte[] ROW1 = Bytes.toBytes("row-1");
+  private static final byte[] ROW2 = Bytes.toBytes("row-2");
+  private static final byte[] ROW3 = Bytes.toBytes("row-3");
+  private static final int rpcTimeout = 1000;
+  private static final int scanTimeout = 3 * rpcTimeout;
+  private static final int metaScanTimeout = 6 * rpcTimeout;
+  private static final int CLIENT_RETRIES_NUMBER = 3;
+
+  private static TableName tableName;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // Don't report so often so easier to see other rpcs
+    conf.setInt("hbase.regionserver.msginterval", 3 * 10000);
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout);
+    conf.setStrings(HConstants.REGION_SERVER_IMPL, 
RegionServerWithScanTimeout.class.getName());
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
+    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
+    TEST_UTIL.startMiniCluster(1);
+
+    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
+    conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
+    conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
+    ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
+    CONN = ConnectionFactory.createConnection(conf);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    CONN.close();
+    ASYNC_CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  public void setup(boolean isSystemTable) throws IOException {
+    RSRpcServicesWithScanTimeout.reset();
+
+    String nameAsString = name.getMethodName();
+    if (isSystemTable) {
+      nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + 
nameAsString;
+    }
+    tableName = TableName.valueOf(nameAsString);
+    TEST_UTIL.createTable(tableName, FAMILY);
+
+    Table table = CONN.getTable(tableName);
+    putToTable(table, ROW0);
+    putToTable(table, ROW1);
+    putToTable(table, ROW2);
+    putToTable(table, ROW3);
+    LOG.info("Wrote our four values");
+
+    table.getRegionLocator().getAllRegionLocations();
+
+    // reset again incase the creation/population caused anything to trigger
+    RSRpcServicesWithScanTimeout.reset();
+  }
+
+  private void expectRow(byte[] expected, Result result) {
+    assertTrue("Expected row: " + Bytes.toString(expected),
+      Bytes.equals(expected, result.getRow()));
+  }
+
+  private void expectNumTries(int expected) {
+    assertEquals(
+      "Expected tryNumber=" + expected + ", actual=" + 
RSRpcServicesWithScanTimeout.tryNumber,
+      expected, RSRpcServicesWithScanTimeout.tryNumber);
+    // reset for next
+    RSRpcServicesWithScanTimeout.tryNumber = 0;
+  }
+
+  /**
+   * verify that we don't miss any data when encountering an 
OutOfOrderScannerNextException.
+   * Typically, the only way to naturally trigger this is if a client-side 
timeout causes an
+   * erroneous next() call. This is relatively hard to do these days because 
the server attempts to
+   * always return before the timeout. In this test we force the server to 
throw this exception, so
+   * that we can test the retry logic appropriately.
+   */
+  @Test
+  public void testRetryOutOfOrderScannerNextException() throws IOException {
+    expectRetryOutOfOrderScannerNext(() -> getScanner(CONN));
+  }
+
+  /**
+   * AsyncTable version of above
+   */
+  @Test
+  public void testRetryOutOfOrderScannerNextExceptionAsync() throws 
IOException {
+    expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
+  }
+
+  /**
+   * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} 
for normal scans. Use a
+   * special connection which has retries disabled, because otherwise the 
scanner will retry the
+   * timed out next() call and mess up the test.
+   */
+  @Test
+  public void testNormalScanTimeoutOnNext() throws IOException {
+    setup(false);
+    // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and
+    // will retry until scannerTimeout. This makes it more difficult to test 
the timeouts
+    // of normal next() calls. So we use a separate connection here which has 
retries disabled.
+    Configuration confNoRetries = new Configuration(CONN.getConfiguration());
+    confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+    try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) {
+      expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
+    }
+  }
+
+  /**
+   * AsyncTable version of above
+   */
+  @Test
+  public void testNormalScanTimeoutOnNextAsync() throws IOException {
+    setup(false);
+    expectTimeoutOnNext(scanTimeout, this::getAsyncScanner);
+  }
+
+  /**
+   * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for 
openScanner() calls for
+   * meta scans
+   */
+  @Test
+  public void testNormalScanTimeoutOnOpenScanner() throws IOException {
+    setup(false);
+    expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner);
+  }
+
+  /**
+   * AsyncTable version of above
+   */
+  @Test
+  public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException {
+    setup(false);
+    expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner);
+  }
+
+  /**
+   * verify that we honor {@link 
ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for
+   * next() calls in meta scans
+   */
+  @Test
+  public void testMetaScanTimeoutOnNext() throws IOException {
+    setup(true);
+    expectTimeoutOnNext(metaScanTimeout, this::getScanner);
+  }
+
+  /**
+   * AsyncTable version of above
+   */
+  @Test
+  public void testMetaScanTimeoutOnNextAsync() throws IOException {
+    setup(true);
+    expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner);
+  }
+
+  /**
+   * verify that we honor {@link 
ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for
+   * openScanner() calls for meta scans
+   */
+  @Test
+  public void testMetaScanTimeoutOnOpenScanner() throws IOException {
+    setup(true);
+    expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
+  }
+
+  /**
+   * AsyncTable version of above
+   */
+  @Test
+  public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
+    setup(true);
+    expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
+  }
+
+  private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> 
scannerSupplier)
+    throws IOException {
+    setup(false);
+    RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1;
+
+    LOG.info(
+      "Opening scanner, expecting no errors from first next() call from 
openScanner response");
+    ResultScanner scanner = scannerSupplier.get();
+    Result result = scanner.next();
+    expectRow(ROW0, result);
+    expectNumTries(0);
+
+    LOG.info("Making first next() RPC, expecting no errors for seqNo 0");
+    result = scanner.next();
+    expectRow(ROW1, result);
+    expectNumTries(0);
+
+    LOG.info(
+      "Making second next() RPC, expecting OutOfOrderScannerNextException and 
appropriate retry");
+    result = scanner.next();
+    expectRow(ROW2, result);
+    expectNumTries(1);
+
+    // reset so no errors. since last call restarted the scan and following
+    // call would otherwise fail
+    RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1;
+
+    LOG.info("Finishing scan, expecting no errors");
+    result = scanner.next();
+    expectRow(ROW3, result);
+    scanner.close();
+
+    LOG.info("Testing always throw exception");
+    byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 };
+    int i = 0;
+
+    // test the case that RPC always throws
+    scanner = scannerSupplier.get();
+    RSRpcServicesWithScanTimeout.throwAlways = true;
+
+    while (true) {
+      LOG.info("Calling scanner.next()");
+      result = scanner.next();
+      if (result == null) {
+        break;
+      } else {
+        byte[] expectedResult = expectedResults[i++];
+        expectRow(expectedResult, result);
+      }
+    }
+
+    // ensure we verified all rows. this along with the expectRow check above
+    // proves that we didn't miss any rows.
+    assertEquals("Expected to exhaust expectedResults array length=" + 
expectedResults.length
+      + ", actual index=" + i, expectedResults.length, i);
+
+    // expect all but the first row (which came from initial openScanner) to 
have thrown an error
+    expectNumTries(expectedResults.length - 1);
+
+  }
+
+  private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> 
scannerSupplier)
+    throws IOException {
+    RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1;
+    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
+
+    LOG.info(
+      "Opening scanner, expecting no timeouts from first next() call from 
openScanner response");
+    ResultScanner scanner = scannerSupplier.get();
+    Result result = scanner.next();
+    expectRow(ROW0, result);
+
+    LOG.info("Making first next() RPC, expecting no timeout for seqNo 0");
+    result = scanner.next();
+    expectRow(ROW1, result);
+
+    LOG.info("Making second next() RPC, expecting timeout");
+    long start = System.nanoTime();
+    try {
+      scanner.next();
+      fail("Expected CallTimeoutException");
+    } catch (RetriesExhaustedException e) {
+      assertTrue("Expected CallTimeoutException", e.getCause() instanceof 
CallTimeoutException
+        || e.getCause() instanceof SocketTimeoutException);
+    }
+    expectTimeout(start, timeout);
+  }
+
+  private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> 
scannerSupplier)
+    throws IOException {
+    RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
+    RSRpcServicesWithScanTimeout.sleepOnOpen = true;
+    LOG.info("Opening scanner, expecting timeout from first next() call from 
openScanner response");
+    long start = System.nanoTime();
+    try {
+      scannerSupplier.get().next();
+      fail("Expected SocketTimeoutException or CallTimeoutException");
+    } catch (RetriesExhaustedException e) {
+      LOG.info("Got error", e);
+      assertTrue("Expected SocketTimeoutException or CallTimeoutException, but 
was " + e.getCause(),
+        e.getCause() instanceof CallTimeoutException
+          || e.getCause() instanceof SocketTimeoutException);
+    }
+    expectTimeout(start, timeout);
+  }
+
+  private void expectTimeout(long start, int timeout) {
+    long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+    LOG.info("Expected duration >= {}, and got {}", timeout, duration);
+    assertTrue("Expected duration >= " + timeout + ", but was " + duration, 
duration >= timeout);
+  }
+
+  private ResultScanner getScanner() {
+    return getScanner(CONN);
+  }
+
+  private ResultScanner getScanner(Connection conn) {
+    Scan scan = new Scan();
+    scan.setCaching(1);
+    try {
+      return conn.getTable(tableName).getScanner(scan);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private ResultScanner getAsyncScanner() {
+    Scan scan = new Scan();
+    scan.setCaching(1);
+    return ASYNC_CONN.getTable(tableName).getScanner(scan);
+  }
+
+  private void putToTable(Table ht, byte[] rowkey) throws IOException {
+    Put put = new Put(rowkey);
+    put.addColumn(FAMILY, QUALIFIER, VALUE);
+    ht.put(put);
+  }
+
+  private static class RegionServerWithScanTimeout
+    extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
+    public RegionServerWithScanTimeout(Configuration conf)
+      throws IOException, InterruptedException {
+      super(conf);
+    }
+
+    @Override
+    protected RSRpcServices createRpcServices() throws IOException {
+      return new RSRpcServicesWithScanTimeout(this);
+    }
+  }
+
+  private static class RSRpcServicesWithScanTimeout extends RSRpcServices {
+    private long tableScannerId;
+
+    private static long seqNoToThrowOn = -1;
+    private static boolean throwAlways = false;
+    private static boolean threw;
+
+    private static long seqNoToSleepOn = -1;
+    private static boolean sleepOnOpen = false;
+    private static volatile boolean slept;
+    private static int tryNumber = 0;
+
+    private static int sleepTime = rpcTimeout + 500;
+
+    public static void setSleepForTimeout(int timeout) {
+      sleepTime = timeout + 500;
+    }
+
+    public static void reset() {
+      setSleepForTimeout(scanTimeout);
+
+      seqNoToSleepOn = -1;
+      seqNoToThrowOn = -1;
+      throwAlways = false;
+      threw = false;
+      sleepOnOpen = false;
+      slept = false;
+      tryNumber = 0;
+    }
+
+    public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException {
+      super(rs);
+    }
+
+    @Override
+    public ScanResponse scan(final RpcController controller, final ScanRequest 
request)
+      throws ServiceException {
+      if (request.hasScannerId()) {
+        LOG.info("Got request {}", request);
+        ScanResponse scanResponse = super.scan(controller, request);
+        if (tableScannerId != request.getScannerId() || 
request.getCloseScanner()) {
+          return scanResponse;
+        }
+
+        if (
+          throwAlways
+            || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == 
request.getNextCallSeq())
+        ) {
+          threw = true;
+          tryNumber++;
+          LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", 
tryNumber,
+            tableScannerId);
+          throw new ServiceException(new OutOfOrderScannerNextException());
+        }
+
+        if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == 
request.getNextCallSeq()) {
+          try {
+            LOG.info("SLEEPING " + sleepTime);
+            Thread.sleep(sleepTime);
+          } catch (InterruptedException e) {
+          }
+          slept = true;
+          tryNumber++;
+        }
+        return scanResponse;
+      } else {
+        ScanResponse scanRes = super.scan(controller, request);
+        String regionName = 
Bytes.toString(request.getRegion().getValue().toByteArray());
+        if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) 
{
+          tableScannerId = scanRes.getScannerId();
+          if (sleepOnOpen) {
+            try {
+              LOG.info("openScanner SLEEPING " + sleepTime);
+              Thread.sleep(sleepTime);
+            } catch (InterruptedException e) {
+            }
+          }
+        }
+        return scanRes;
+      }
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index 60dfac56946..02dbb76568c 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTestConst;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
@@ -91,6 +93,7 @@ public class TestScannerHeartbeatMessages {
   private final static HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
 
   private static Table TABLE = null;
+  private static Connection CONN = null;
 
   /**
    * Table configuration
@@ -134,6 +137,7 @@ public class TestScannerHeartbeatMessages {
 
     conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName());
     conf.setStrings(HConstants.REGION_SERVER_IMPL, 
HeartbeatHRegionServer.class.getName());
+    // setting these here for usage on the server side. will override for 
client side below
     conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
SERVER_TIMEOUT);
     conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT);
     conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1);
@@ -142,6 +146,10 @@ public class TestScannerHeartbeatMessages {
     conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
     TEST_UTIL.startMiniCluster(1);
 
+    // set client timeout for client side, we want it to be less than server 
side.
+    Configuration clientConf = new Configuration(conf);
+    clientConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
CLIENT_TIMEOUT);
+    CONN = ConnectionFactory.createConnection(clientConf);
     TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
   }
 
@@ -180,11 +188,11 @@ public class TestScannerHeartbeatMessages {
   }
 
   static Table createTestTable(TableName name, byte[][] rows, byte[][] 
families,
-      byte[][] qualifiers, byte[] cellValue) throws IOException {
-    Table ht = TEST_UTIL.createTable(name, families);
+    byte[][] qualifiers, byte[] cellValue) throws IOException {
+    TEST_UTIL.createTable(name, families);
+    Table ht = CONN.getTable(name);
     List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
     ht.put(puts);
-    
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 
CLIENT_TIMEOUT);
     return ht;
   }
 
@@ -212,6 +220,7 @@ public class TestScannerHeartbeatMessages {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    CONN.close();
     TEST_UTIL.shutdownMiniCluster();
   }
 

Reply via email to