This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 82c354eea5c HBASE-28085 Configurably use scanner timeout as rpc
timeout for scanner next calls (#5402)
82c354eea5c is described below
commit 82c354eea5c336ed4ec2e86532089ae48c1f517a
Author: Bryan Beaudreault <[email protected]>
AuthorDate: Wed Sep 20 17:17:51 2023 -0400
HBASE-28085 Configurably use scanner timeout as rpc timeout for scanner
next calls (#5402)
Signed-off-by: Nick Dimiduk <[email protected]>
Signed-off-by: Duo Zhang <[email protected]>
---
.../hbase/client/ClientAsyncPrefetchScanner.java | 6 +-
.../apache/hadoop/hbase/client/ClientScanner.java | 17 +-
.../hadoop/hbase/client/ClientSimpleScanner.java | 6 +-
.../hbase/client/ConnectionConfiguration.java | 16 ++
.../hbase/client/ConnectionImplementation.java | 2 +-
.../org/apache/hadoop/hbase/client/HTable.java | 6 +-
.../hadoop/hbase/client/ReversedClientScanner.java | 6 +-
.../hbase/client/ScannerCallableWithReplicas.java | 49 +++++-
.../hadoop/hbase/client/TestClientScanner.java | 18 +-
.../hbase/client/TestClientScannerTimeouts.java | 183 ++++++++++++++++++---
10 files changed, 247 insertions(+), 62 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index 769931b7083..abd1267ffc4 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -66,9 +66,11 @@ public class ClientAsyncPrefetchScanner extends
ClientSimpleScanner {
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int
scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
- Map<String, byte[]> requestAttributes) throws IOException {
+ ConnectionConfiguration connectionConfiguration, Map<String, byte[]>
requestAttributes)
+ throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory,
rpcControllerFactory, pool,
- scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
requestAttributes);
+ scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
+ connectionConfiguration, requestAttributes);
exceptionsQueue = new ConcurrentLinkedQueue<>();
final Context context = Context.current();
final Runnable runnable = context.wrap(new PrefetchRunnable());
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 33cfedc362a..ef8e4b0404f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
@@ -78,6 +77,7 @@ public abstract class ClientScanner extends
AbstractClientScanner {
protected final TableName tableName;
protected final int readRpcTimeout;
protected final int scannerTimeout;
+ private final boolean useScannerTimeoutForNextCalls;
protected boolean scanMetricsPublished = false;
protected RpcRetryingCaller<Result[]> caller;
protected RpcControllerFactory rpcControllerFactory;
@@ -104,7 +104,8 @@ public abstract class ClientScanner extends
AbstractClientScanner {
public ClientScanner(final Configuration conf, final Scan scan, final
TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int
scanReadRpcTimeout,
- int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]>
requestAttributes)
+ int scannerTimeout, int primaryOperationTimeout,
+ ConnectionConfiguration connectionConfiguration, Map<String, byte[]>
requestAttributes)
throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace(
@@ -116,16 +117,15 @@ public abstract class ClientScanner extends
AbstractClientScanner {
this.connection = connection;
this.pool = pool;
this.primaryOperationTimeout = primaryOperationTimeout;
- this.retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ this.retries = connectionConfiguration.getRetriesNumber();
if (scan.getMaxResultSize() > 0) {
this.maxScannerResultSize = scan.getMaxResultSize();
} else {
- this.maxScannerResultSize =
conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
- HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+ this.maxScannerResultSize =
connectionConfiguration.getScannerMaxResultSize();
}
this.readRpcTimeout = scanReadRpcTimeout;
this.scannerTimeout = scannerTimeout;
+ this.useScannerTimeoutForNextCalls =
connectionConfiguration.isUseScannerTimeoutForNextCalls();
this.requestAttributes = requestAttributes;
// check if application wants to collect scan metrics
@@ -135,8 +135,7 @@ public abstract class ClientScanner extends
AbstractClientScanner {
if (this.scan.getCaching() > 0) {
this.caching = this.scan.getCaching();
} else {
- this.caching = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
- HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+ this.caching = connectionConfiguration.getScannerCaching();
}
this.caller = rpcFactory.<Result[]> newCaller();
@@ -255,7 +254,7 @@ public abstract class ClientScanner extends
AbstractClientScanner {
this.currentRegion = null;
this.callable = new ScannerCallableWithReplicas(getTable(),
getConnection(),
createScannerCallable(), pool, primaryOperationTimeout, scan,
getRetries(), readRpcTimeout,
- scannerTimeout, caching, conf, caller);
+ scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
this.callable.setCaching(this.caching);
incRegionCountMetrics(scanMetrics);
return true;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
index 81091ad3010..bde036f8880 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java
@@ -39,9 +39,11 @@ public class ClientSimpleScanner extends ClientScanner {
ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory,
RpcControllerFactory rpcControllerFactory, ExecutorService pool, int
scanReadRpcTimeout,
int scannerTimeout, int replicaCallTimeoutMicroSecondScan,
- Map<String, byte[]> requestAttributes) throws IOException {
+ ConnectionConfiguration connectionConfiguration, Map<String, byte[]>
requestAttributes)
+ throws IOException {
super(configuration, scan, name, connection, rpcCallerFactory,
rpcControllerFactory, pool,
- scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
requestAttributes);
+ scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan,
+ connectionConfiguration, requestAttributes);
}
@Override
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 93fa2600d89..2a6651b5dde 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -76,6 +76,12 @@ public class ConnectionConfiguration {
public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT =
"hbase.client.meta.scanner.timeout.period";
+ public static final String
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS =
+ "hbase.client.use.scanner.timeout.period.for.next.calls";
+
+ public static final boolean
HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT =
+ false;
+
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
private final long writeBufferPeriodicFlushTimerTickMs;
@@ -99,6 +105,7 @@ public class ConnectionConfiguration {
private final boolean clientScannerAsyncPrefetch;
private final long pauseMs;
private final long pauseMsForServerOverloaded;
+ private final boolean useScannerTimeoutForNextCalls;
/**
* Constructor
@@ -158,6 +165,9 @@ public class ConnectionConfiguration {
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT,
scanTimeout);
+ this.useScannerTimeoutForNextCalls =
+ conf.getBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS,
+ HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT);
long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE,
DEFAULT_HBASE_CLIENT_PAUSE);
long pauseMsForServerOverloaded =
conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED,
@@ -201,6 +211,8 @@ public class ConnectionConfiguration {
this.metaScanTimeout = scanTimeout;
this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE;
this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE;
+ this.useScannerTimeoutForNextCalls =
+ HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS_DEFAULT;
}
public int getReadRpcTimeout() {
@@ -275,6 +287,10 @@ public class ConnectionConfiguration {
return scanTimeout;
}
+ public boolean isUseScannerTimeoutForNextCalls() {
+ return useScannerTimeoutForNextCalls;
+ }
+
public int getMetaScanTimeout() {
return metaScanTimeout;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index ff7418e39cd..70d5760df48 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1052,7 +1052,7 @@ public class ConnectionImplementation implements
ClusterConnection, Closeable {
ReversedClientScanner rcs = new ReversedClientScanner(conf, s,
TableName.META_TABLE_NAME,
this, rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(),
connectionConfig.getMetaReadRpcTimeout(),
connectionConfig.getMetaScanTimeout(),
- metaReplicaCallTimeoutScanInMicroSecond, Collections.emptyMap())) {
+ metaReplicaCallTimeoutScanInMicroSecond, connectionConfig,
Collections.emptyMap())) {
boolean tableNotFound = true;
for (;;) {
Result regionInfoRow = rcs.next();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index cc24d80f5ed..386a7db3526 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -321,16 +321,16 @@ public class HTable implements Table {
if (scan.isReversed()) {
return new ReversedClientScanner(getConfiguration(), scan, getName(),
connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout,
scanTimeout,
- replicaTimeout, requestAttributes);
+ replicaTimeout, connConfiguration, requestAttributes);
} else {
if (async) {
return new ClientAsyncPrefetchScanner(getConfiguration(), scan,
getName(), connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout,
scanTimeout,
- replicaTimeout, requestAttributes);
+ replicaTimeout, connConfiguration, requestAttributes);
} else {
return new ClientSimpleScanner(getConfiguration(), scan, getName(),
connection,
rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout,
scanTimeout,
- replicaTimeout, requestAttributes);
+ replicaTimeout, connConfiguration, requestAttributes);
}
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
index 68a8e7b7406..36bbdb5b60e 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java
@@ -40,10 +40,12 @@ public class ReversedClientScanner extends ClientScanner {
public ReversedClientScanner(Configuration conf, Scan scan, TableName
tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
RpcControllerFactory controllerFactory, ExecutorService pool, int
scanReadRpcTimeout,
- int scannerTimeout, int primaryOperationTimeout, Map<String, byte[]>
requestAttributes)
+ int scannerTimeout, int primaryOperationTimeout,
+ ConnectionConfiguration connectionConfiguration, Map<String, byte[]>
requestAttributes)
throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory,
pool,
- scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout,
requestAttributes);
+ scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout,
connectionConfiguration,
+ requestAttributes);
}
@Override
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 227ad849c84..5261ff4af5c 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -57,6 +57,7 @@ class ScannerCallableWithReplicas implements
RetryingCallable<Result[]> {
AtomicBoolean replicaSwitched = new AtomicBoolean(false);
private final ClusterConnection cConnection;
protected final ExecutorService pool;
+ private final boolean useScannerTimeoutForNextCalls;
protected final int timeBeforeReplicas;
private final Scan scan;
private final int retries;
@@ -72,11 +73,12 @@ class ScannerCallableWithReplicas implements
RetryingCallable<Result[]> {
public ScannerCallableWithReplicas(TableName tableName, ClusterConnection
cConnection,
ScannerCallable baseCallable, ExecutorService pool, int
timeBeforeReplicas, Scan scan,
- int retries, int readRpcTimeout, int scannerTimeout, int caching,
Configuration conf,
- RpcRetryingCaller<Result[]> caller) {
+ int retries, int readRpcTimeout, int scannerTimeout, boolean
useScannerTimeoutForNextCalls,
+ int caching, Configuration conf, RpcRetryingCaller<Result[]> caller) {
this.currentScannerCallable = baseCallable;
this.cConnection = cConnection;
this.pool = pool;
+ this.useScannerTimeoutForNextCalls = useScannerTimeoutForNextCalls;
if (timeBeforeReplicas < 0) {
throw new IllegalArgumentException("Invalid value of operation timeout
on the primary");
}
@@ -187,9 +189,12 @@ class ScannerCallableWithReplicas implements
RetryingCallable<Result[]> {
pool, regionReplication * 5);
AtomicBoolean done = new AtomicBoolean(false);
+ // make sure we use the same rpcTimeout for current and other replicas
+ int rpcTimeoutForCall = getRpcTimeout();
+
replicaSwitched.set(false);
// submit call for the primary replica or user specified replica
- addCallsForCurrentReplica(cs);
+ addCallsForCurrentReplica(cs, rpcTimeoutForCall);
int startIndex = 0;
try {
@@ -234,7 +239,7 @@ class ScannerCallableWithReplicas implements
RetryingCallable<Result[]> {
endIndex = 1;
} else {
// TODO: this may be an overkill for large region replication
- addCallsForOtherReplicas(cs, 0, regionReplication - 1);
+ addCallsForOtherReplicas(cs, 0, regionReplication - 1,
rpcTimeoutForCall);
}
try {
@@ -326,15 +331,41 @@ class ScannerCallableWithReplicas implements
RetryingCallable<Result[]> {
return currentScannerCallable != null ? currentScannerCallable.getCursor()
: null;
}
- private void
- addCallsForCurrentReplica(ResultBoundedCompletionService<Pair<Result[],
ScannerCallable>> cs) {
+ private void addCallsForCurrentReplica(
+ ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int
rpcTimeout) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
- cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout,
currentScannerCallable.id);
+ cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout,
currentScannerCallable.id);
+ }
+
+ /**
+ * As we have a call sequence for scan, it is useless to have a different
rpc timeout which is
+ * less than the scan timeout. If the server does not respond in
time(usually this will not happen
+ * as we have heartbeat now), we will get an OutOfOrderScannerNextException
when resending the
+ * next request and the only way to fix this is to close the scanner and
open a new one.
+ * <p>
+ * The legacy behavior of ScannerCallable has been to use readRpcTimeout
despite the above. If
+ * using legacy behavior, we always use that.
+ * <p>
+ * If new behavior is enabled, we determine the rpc timeout to use based on
whether the scanner is
+ * open. If scanner is open, use scannerTimeout otherwise use readRpcTimeout.
+ */
+ private int getRpcTimeout() {
+ if (useScannerTimeoutForNextCalls) {
+ return isNextCall() ? scannerTimeout : readRpcTimeout;
+ } else {
+ return readRpcTimeout;
+ }
+ }
+
+ private boolean isNextCall() {
+ return currentScannerCallable != null && currentScannerCallable.scannerId
!= -1
+ && !currentScannerCallable.renew && !currentScannerCallable.closed;
}
private void addCallsForOtherReplicas(
- ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int
min, int max) {
+ ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, int
min, int max,
+ int rpcTimeout) {
for (int id = min; id <= max; id++) {
if (currentScannerCallable.id == id) {
@@ -344,7 +375,7 @@ class ScannerCallableWithReplicas implements
RetryingCallable<Result[]> {
setStartRowForReplicaCallable(s);
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
- cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id);
+ cs.submit(retryingOnReplica, rpcTimeout, scannerTimeout, id);
}
}
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
index 0025a4fdbdb..9b5eb91bbd5 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java
@@ -108,12 +108,12 @@ public class TestClientScanner {
public MockClientScanner(final Configuration conf, final Scan scan, final
TableName tableName,
ClusterConnection connection, RpcRetryingCallerFactory rpcFactory,
- RpcControllerFactory controllerFactory, ExecutorService pool, int
primaryOperationTimeout)
- throws IOException {
+ RpcControllerFactory controllerFactory, ExecutorService pool, int
primaryOperationTimeout,
+ ConnectionConfiguration connectionConfig) throws IOException {
super(conf, scan, tableName, connection, rpcFactory, controllerFactory,
pool,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
primaryOperationTimeout,
- Collections.emptyMap());
+ connectionConfig, Collections.emptyMap());
}
@Override
@@ -178,7 +178,7 @@ public class TestClientScanner {
try (MockClientScanner scanner =
new MockClientScanner(conf, scan,
TableName.valueOf(name.getMethodName()), clusterConn,
- rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+ rpcFactory, controllerFactory, pool, Integer.MAX_VALUE,
connectionConfig)) {
scanner.setRpcFinished(true);
@@ -242,7 +242,7 @@ public class TestClientScanner {
try (MockClientScanner scanner =
new MockClientScanner(conf, scan,
TableName.valueOf(name.getMethodName()), clusterConn,
- rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+ rpcFactory, controllerFactory, pool, Integer.MAX_VALUE,
connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);
scanner.loadCache();
@@ -305,7 +305,7 @@ public class TestClientScanner {
try (MockClientScanner scanner =
new MockClientScanner(conf, scan,
TableName.valueOf(name.getMethodName()), clusterConn,
- rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+ rpcFactory, controllerFactory, pool, Integer.MAX_VALUE,
connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);
scanner.loadCache();
@@ -376,7 +376,7 @@ public class TestClientScanner {
try (MockClientScanner scanner =
new MockClientScanner(conf, scan,
TableName.valueOf(name.getMethodName()), clusterConn,
- rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+ rpcFactory, controllerFactory, pool, Integer.MAX_VALUE,
connectionConfig)) {
scanner.setRpcFinished(true);
InOrder inOrder = Mockito.inOrder(caller);
@@ -443,7 +443,7 @@ public class TestClientScanner {
try (MockClientScanner scanner =
new MockClientScanner(conf, scan,
TableName.valueOf(name.getMethodName()), clusterConn,
- rpcFactory, controllerFactory, pool, Integer.MAX_VALUE)) {
+ rpcFactory, controllerFactory, pool, Integer.MAX_VALUE,
connectionConfig)) {
InOrder inOrder = Mockito.inOrder(caller);
scanner.setRpcFinished(true);
@@ -488,7 +488,7 @@ public class TestClientScanner {
try (MockClientScanner scanner =
new MockClientScanner(conf, scan,
TableName.valueOf(name.getMethodName()), clusterConn,
- rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE)) {
+ rpcFactory, new RpcControllerFactory(conf), pool, Integer.MAX_VALUE,
connectionConfig)) {
Iterator<Result> iter = scanner.iterator();
while (iter.hasNext()) {
iter.next();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
index 2bff2297ff5..259bbc4ad9b 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java
@@ -19,16 +19,20 @@ package org.apache.hadoop.hbase.client;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY;
import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT;
+import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.SocketTimeoutException;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
@@ -41,13 +45,17 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +65,7 @@ import
org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
public class TestClientScannerTimeouts {
@@ -67,8 +76,8 @@ public class TestClientScannerTimeouts {
private static final Logger LOG =
LoggerFactory.getLogger(TestClientScannerTimeouts.class);
private final static HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
- private static AsyncConnection ASYNC_CONN;
- private static Connection CONN;
+ private AsyncConnection ASYNC_CONN;
+ private Connection CONN;
private static final byte[] FAMILY = Bytes.toBytes("testFamily");
private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier");
private static final byte[] VALUE = Bytes.toBytes("testValue");
@@ -79,7 +88,8 @@ public class TestClientScannerTimeouts {
private static final byte[] ROW3 = Bytes.toBytes("row-3");
private static final int rpcTimeout = 1000;
private static final int scanTimeout = 3 * rpcTimeout;
- private static final int metaScanTimeout = 6 * rpcTimeout;
+ private static final int metaReadRpcTimeout = 6 * rpcTimeout;
+ private static final int metaScanTimeout = 9 * rpcTimeout;
private static final int CLIENT_RETRIES_NUMBER = 3;
private static TableName tableName;
@@ -87,6 +97,14 @@ public class TestClientScannerTimeouts {
@Rule
public TestName name = new TestName();
+ @Parameterized.Parameter
+ public boolean useScannerTimeoutPeriodForNextCalls;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> parameters() {
+ return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
+ }
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
@@ -97,25 +115,38 @@ public class TestClientScannerTimeouts {
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000);
TEST_UTIL.startMiniCluster(1);
+ }
+ @Before
+ public void setUp() throws Exception {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout);
- conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout);
+ conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaReadRpcTimeout);
conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout);
+ conf.setBoolean(HBASE_CLIENT_USE_SCANNER_TIMEOUT_PERIOD_FOR_NEXT_CALLS,
+ useScannerTimeoutPeriodForNextCalls);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get();
CONN = ConnectionFactory.createConnection(conf);
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
+ @After
+ public void after() throws Exception {
CONN.close();
ASYNC_CONN.close();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
public void setup(boolean isSystemTable) throws IOException {
RSRpcServicesWithScanTimeout.reset();
- String nameAsString = name.getMethodName();
+ // parameterization adds non-alphanumeric chars to the method name. strip
them so
+ // it parses as a table name
+ String nameAsString = name.getMethodName().replaceAll("[^a-zA-Z0-9]", "_")
+ "-"
+ + useScannerTimeoutPeriodForNextCalls;
if (isSystemTable) {
nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" +
nameAsString;
}
@@ -168,22 +199,10 @@ public class TestClientScannerTimeouts {
expectRetryOutOfOrderScannerNext(this::getAsyncScanner);
}
- /**
- * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY}
for normal scans. Use a
- * special connection which has retries disabled, because otherwise the
scanner will retry the
- * timed out next() call and mess up the test.
- */
@Test
public void testNormalScanTimeoutOnNext() throws IOException {
setup(false);
- // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and
- // will retry until scannerTimeout. This makes it more difficult to test
the timeouts
- // of normal next() calls. So we use a separate connection here which has
retries disabled.
- Configuration confNoRetries = new Configuration(CONN.getConfiguration());
- confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
- try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) {
- expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
- }
+ testScanTimeoutOnNext(rpcTimeout, scanTimeout);
}
/**
@@ -221,7 +240,30 @@ public class TestClientScannerTimeouts {
@Test
public void testMetaScanTimeoutOnNext() throws IOException {
setup(true);
- expectTimeoutOnNext(metaScanTimeout, this::getScanner);
+ testScanTimeoutOnNext(metaReadRpcTimeout, metaScanTimeout);
+ }
+
+ private void testScanTimeoutOnNext(int rpcTimeout, int scannerTimeout)
throws IOException {
+ if (useScannerTimeoutPeriodForNextCalls) {
+ // Since this has HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS
enabled, we pass
+ // scannerTimeout as the expected timeout duration.
+ expectTimeoutOnNext(scannerTimeout, this::getScanner);
+ } else {
+ // Otherwise we pass rpcTimeout as the expected timeout duration.
+ // In this case we need a special connection which disables retries,
otherwise the scanner
+ // will retry the timed out next() call, which will cause out of order
exception and mess up
+ // the test
+ try (Connection conn = getNoRetriesConnection()) {
+ // Now since we disabled
HBASE_CLIENT_USE_SCANNER_TIMEOUT_FOR_NEXT_CALLS, verify rpcTimeout
+ expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn));
+ }
+ }
+ }
+
+ private Connection getNoRetriesConnection() throws IOException {
+ Configuration confNoRetries = new Configuration(CONN.getConfiguration());
+ confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0);
+ return ConnectionFactory.createConnection(confNoRetries);
}
/**
@@ -240,7 +282,7 @@ public class TestClientScannerTimeouts {
@Test
public void testMetaScanTimeoutOnOpenScanner() throws IOException {
setup(true);
- expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner);
+ expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getScanner);
}
/**
@@ -249,7 +291,51 @@ public class TestClientScannerTimeouts {
@Test
public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException {
setup(true);
- expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner);
+ expectTimeoutOnOpenScanner(metaReadRpcTimeout, this::getAsyncScanner);
+ }
+
+ /**
+ * Test renewLease timeout for non-async scanner, which should use
rpcTimeout. Async scanner does
+ * lease renewal automatically in the background, so renewLease() always
returns false. So this
+ * test doesn't have an Async counterpart like the others.
+ */
+ @Test
+ public void testNormalScanTimeoutOnRenewLease() throws IOException {
+ setup(false);
+ expectTimeoutOnRenewScanner(rpcTimeout, this::getScanner);
+ }
+
+ /**
+ * Test renewLease timeout for non-async scanner, which should use
rpcTimeout. Async scanner does
+ * lease renewal automatically in the background, so renewLease() always
returns false. So this
+ * test doesn't have an Async counterpart like the others.
+ */
+ @Test
+ public void testMetaScanTimeoutOnRenewLease() throws IOException {
+ setup(true);
+ expectTimeoutOnRenewScanner(metaReadRpcTimeout, this::getScanner);
+ }
+
+ /**
+ * Test close timeout for non-async scanner, which should use rpcTimeout.
Async scanner does
+ * closes async and always returns immediately. So this test doesn't have an
Async counterpart
+ * like the others.
+ */
+ @Test
+ public void testNormalScanTimeoutOnClose() throws IOException {
+ setup(false);
+ expectTimeoutOnCloseScanner(rpcTimeout, this::getScanner);
+ }
+
+ /**
+ * Test close timeout for non-async scanner, which should use rpcTimeout.
Async scanner does
+ * closes async and always returns immediately. So this test doesn't have an
Async counterpart
+ * like the others.
+ */
+ @Test
+ public void testMetaScanTimeoutOnClose() throws IOException {
+ setup(true);
+ expectTimeoutOnCloseScanner(metaReadRpcTimeout, this::getScanner);
}
private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner>
scannerSupplier)
@@ -358,6 +444,34 @@ public class TestClientScannerTimeouts {
expectTimeout(start, timeout);
}
+ private void expectTimeoutOnRenewScanner(int timeout,
Supplier<ResultScanner> scannerSupplier)
+ throws IOException {
+ RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
+ RSRpcServicesWithScanTimeout.sleepOnRenew = true;
+ LOG.info(
+ "Opening scanner, expecting no timeouts from first next() call from
openScanner response");
+ long start = System.nanoTime();
+ ResultScanner scanner = scannerSupplier.get();
+ scanner.next();
+ assertFalse("Expected renewLease to fail due to timeout",
scanner.renewLease());
+ expectTimeout(start, timeout);
+ }
+
+ private void expectTimeoutOnCloseScanner(int timeout,
Supplier<ResultScanner> scannerSupplier)
+ throws IOException {
+ RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout);
+ RSRpcServicesWithScanTimeout.sleepOnClose = true;
+ LOG.info(
+ "Opening scanner, expecting no timeouts from first next() call from
openScanner response");
+ long start = System.nanoTime();
+ ResultScanner scanner = scannerSupplier.get();
+ scanner.next();
+ // close doesnt throw or return anything, so we can't verify it directly.
+ // but we can verify that it took as long as we expect below
+ scanner.close();
+ expectTimeout(start, timeout);
+ }
+
private void expectTimeout(long start, int timeout) {
long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
LOG.info("Expected duration >= {}, and got {}", timeout, duration);
@@ -412,6 +526,8 @@ public class TestClientScannerTimeouts {
private static long seqNoToSleepOn = -1;
private static boolean sleepOnOpen = false;
+ private static boolean sleepOnRenew = false;
+ private static boolean sleepOnClose = false;
private static volatile boolean slept;
private static int tryNumber = 0;
@@ -429,6 +545,8 @@ public class TestClientScannerTimeouts {
throwAlways = false;
threw = false;
sleepOnOpen = false;
+ sleepOnRenew = false;
+ sleepOnClose = false;
slept = false;
tryNumber = 0;
}
@@ -443,7 +561,19 @@ public class TestClientScannerTimeouts {
if (request.hasScannerId()) {
LOG.info("Got request {}", request);
ScanResponse scanResponse = super.scan(controller, request);
- if (tableScannerId != request.getScannerId() ||
request.getCloseScanner()) {
+ if (tableScannerId != request.getScannerId()) {
+ return scanResponse;
+ }
+ if (request.getCloseScanner()) {
+ if (!slept && sleepOnClose) {
+ try {
+ LOG.info("SLEEPING " + sleepTime);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ }
+ slept = true;
+ tryNumber++;
+ }
return scanResponse;
}
@@ -458,7 +588,10 @@ public class TestClientScannerTimeouts {
throw new ServiceException(new OutOfOrderScannerNextException());
}
- if (!slept && request.hasNextCallSeq() && seqNoToSleepOn ==
request.getNextCallSeq()) {
+ if (
+ !slept && (request.hasNextCallSeq() && seqNoToSleepOn ==
request.getNextCallSeq()
+ || sleepOnRenew && request.getRenew())
+ ) {
try {
LOG.info("SLEEPING " + sleepTime);
Thread.sleep(sleepTime);