This is an automated email from the ASF dual-hosted git repository. bbeaudreault pushed a commit to branch hubspot-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 7023542cce0b40b2ec2a3eb1c421501858085afb Author: Bryan Beaudreault <bbeaudrea...@hubspot.com> AuthorDate: Thu Jul 7 16:33:16 2022 -0400 HubSpot Backport: HBASE-27078 Allow configuring a separate timeout for meta scans (#4585) Signed-off-by: Duo Zhang <zhang...@apache.org> Signed-off-by: Andrew Purtell <apurt...@apache.org> --- .../hbase/client/AsyncConnectionConfiguration.java | 36 +- .../hadoop/hbase/client/AsyncTableBuilderBase.java | 7 +- .../hbase/client/ClientAsyncPrefetchScanner.java | 8 +- .../apache/hadoop/hbase/client/ClientScanner.java | 17 +- .../hadoop/hbase/client/ClientSimpleScanner.java | 8 +- .../hbase/client/ConnectionConfiguration.java | 31 ++ .../hbase/client/ConnectionImplementation.java | 3 +- .../org/apache/hadoop/hbase/client/HTable.java | 31 +- .../client/ResultBoundedCompletionService.java | 21 +- .../hadoop/hbase/client/ReversedClientScanner.java | 8 +- .../client/RpcRetryingCallerWithReadReplicas.java | 2 +- .../hbase/client/ScannerCallableWithReplicas.java | 14 +- .../hadoop/hbase/client/TableBuilderBase.java | 6 + .../hadoop/hbase/client/TestClientScanner.java | 6 +- .../hbase/client/TestClientScannerTimeouts.java | 488 +++++++++++++++++++++ .../regionserver/TestScannerHeartbeatMessages.java | 15 +- 16 files changed, 634 insertions(+), 67 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 69616649d57..ca16d19ba26 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -23,6 +23,9 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PE import static org.apache.hadoop.hbase.HConstants.HBASE_META_SCANNER_CACHING; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; + import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.yetus.audience.InterfaceAudience; @@ -56,6 +59,9 @@ class AsyncConnectionConfiguration { // timeout for each read rpc request private final long readRpcTimeoutNs; + // timeout for each read rpc request against system tables + private final long metaReadRpcTimeoutNs; + // timeout for each write rpc request private final long writeRpcTimeoutNs; @@ -73,6 +79,7 @@ class AsyncConnectionConfiguration { // client that it is still alive. The scan timeout is used as operation timeout for every // operations in a scan, such as openScanner or next. private final long scanTimeoutNs; + private final long metaScanTimeoutNs; private final int scannerCaching; @@ -110,12 +117,13 @@ class AsyncConnectionConfiguration { connectionConf.getMetaOperationTimeout()); this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getOperationTimeout()); this.rpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getRpcTimeout()); - this.readRpcTimeoutNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, - connectionConf.getReadRpcTimeout())); - this.writeRpcTimeoutNs = - TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, - connectionConf.getWriteRpcTimeout())); + long readRpcTimeoutMillis = + conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, connectionConf.getRpcTimeout()); + this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(readRpcTimeoutMillis); + this.metaReadRpcTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeoutMillis)); + this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, connectionConf.getWriteRpcTimeout())); this.pauseNs = TimeUnit.MILLISECONDS.toNanos(connectionConf.getPauseMillis()); this.pauseNsForServerOverloaded = TimeUnit.MILLISECONDS.toNanos( connectionConf.getPauseMillisForServerOverloaded()); @@ -125,9 +133,11 @@ class AsyncConnectionConfiguration { connectionConf.getReplicaCallTimeoutMicroSecondScan()); this.primaryMetaScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(connectionConf.getMetaReplicaCallTimeoutMicroSecondScan()); - this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos( - conf.getInt(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD)); + long scannerTimeoutMillis = conf.getLong(HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + this.scanTimeoutNs = TimeUnit.MILLISECONDS.toNanos(scannerTimeoutMillis); + this.metaScanTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_CLIENT_META_SCANNER_TIMEOUT, scannerTimeoutMillis)); // fields not in connection configuration this.startLogErrorsCnt = @@ -152,6 +162,10 @@ class AsyncConnectionConfiguration { return readRpcTimeoutNs; } + long getMetaReadRpcTimeoutNs() { + return metaReadRpcTimeoutNs; + } + long getWriteRpcTimeoutNs() { return writeRpcTimeoutNs; } @@ -176,6 +190,10 @@ class AsyncConnectionConfiguration { return scanTimeoutNs; } + long getMetaScanTimeoutNs() { + return metaScanTimeoutNs; + } + int getScannerCaching() { return scannerCaching; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java index bec9f123690..00daeafb37f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java @@ -55,9 +55,12 @@ abstract class AsyncTableBuilderBase<C extends ScanResultConsumerBase> this.tableName = tableName; this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs() : connConf.getOperationTimeoutNs(); - this.scanTimeoutNs = connConf.getScanTimeoutNs(); + this.scanTimeoutNs = + tableName.isSystemTable() ? connConf.getMetaScanTimeoutNs() : connConf.getScanTimeoutNs(); this.rpcTimeoutNs = connConf.getRpcTimeoutNs(); - this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs(); + this.readRpcTimeoutNs = tableName.isSystemTable() + ? connConf.getMetaReadRpcTimeoutNs() + : connConf.getReadRpcTimeoutNs(); this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs(); this.pauseNs = connConf.getPauseNs(); this.pauseNsForServerOverloaded = connConf.getPauseNsForServerOverloaded(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java index 0c832acdb37..378fe8d2855 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java @@ -60,11 +60,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner { private final Condition notFull = lock.newCondition(); public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, - RpcControllerFactory rpcControllerFactory, ExecutorService pool, - int replicaCallTimeoutMicroSecondScan) throws IOException { + ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - replicaCallTimeoutMicroSecondScan); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan); exceptionsQueue = new ConcurrentLinkedQueue<>(); Threads.setDaemonThreadRunning(new Thread(new PrefetchRunnable()), name + ".asyncPrefetcher"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index c799e5b3872..f29ce95181c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -70,6 +70,7 @@ public abstract class ClientScanner extends AbstractClientScanner { protected final long maxScannerResultSize; private final ClusterConnection connection; protected final TableName tableName; + protected final int readRpcTimeout; protected final int scannerTimeout; protected boolean scanMetricsPublished = false; protected RpcRetryingCaller<Result[]> caller; @@ -94,9 +95,9 @@ public abstract class ClientScanner extends AbstractClientScanner { * @throws IOException */ public ClientScanner(final Configuration conf, final Scan scan, final TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { + ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int primaryOperationTimeout) throws IOException { if (LOG.isTraceEnabled()) { LOG.trace( "Scan table=" + tableName + ", startRow=" + Bytes.toStringBinary(scan.getStartRow())); @@ -115,8 +116,8 @@ public abstract class ClientScanner extends AbstractClientScanner { this.maxScannerResultSize = conf.getLong(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); } - this.scannerTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, - HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + this.readRpcTimeout = scanReadRpcTimeout; + this.scannerTimeout = scannerTimeout; // check if application wants to collect scan metrics initScanMetrics(scan); @@ -243,9 +244,9 @@ public abstract class ClientScanner extends AbstractClientScanner { // clear the current region, we will set a new value to it after the first call of the new // callable. this.currentRegion = null; - this.callable = - new ScannerCallableWithReplicas(getTable(), getConnection(), createScannerCallable(), pool, - primaryOperationTimeout, scan, getRetries(), scannerTimeout, caching, conf, caller); + this.callable = new ScannerCallableWithReplicas(getTable(), getConnection(), + createScannerCallable(), pool, primaryOperationTimeout, scan, getRetries(), readRpcTimeout, + scannerTimeout, caching, conf, caller); this.callable.setCaching(this.caching); incRegionCountMetrics(scanMetrics); return true; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java index 2211f8696ef..33b31213f94 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSimpleScanner.java @@ -37,11 +37,11 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @InterfaceAudience.Private public class ClientSimpleScanner extends ClientScanner { public ClientSimpleScanner(Configuration configuration, Scan scan, TableName name, - ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, - RpcControllerFactory rpcControllerFactory, ExecutorService pool, - int replicaCallTimeoutMicroSecondScan) throws IOException { + ClusterConnection connection, RpcRetryingCallerFactory rpcCallerFactory, + RpcControllerFactory rpcControllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int replicaCallTimeoutMicroSecondScan) throws IOException { super(configuration, scan, name, connection, rpcCallerFactory, rpcControllerFactory, pool, - replicaCallTimeoutMicroSecondScan); + scanReadRpcTimeout, scannerTimeout, replicaCallTimeoutMicroSecondScan); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java index eee2139796f..ae1e06567d5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java @@ -66,6 +66,11 @@ public class ConnectionConfiguration { HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED); } + public static final String HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY = + "hbase.client.meta.read.rpc.timeout"; + public static final String HBASE_CLIENT_META_SCANNER_TIMEOUT = + "hbase.client.meta.scanner.timeout.period"; + private final long writeBufferSize; private final long writeBufferPeriodicFlushTimeoutMs; private final long writeBufferPeriodicFlushTimerTickMs; @@ -80,7 +85,11 @@ public class ConnectionConfiguration { private final int maxKeyValueSize; private final int rpcTimeout; private final int readRpcTimeout; + private final int metaReadRpcTimeout; private final int writeRpcTimeout; + private final int scanTimeout; + private final int metaScanTimeout; + // toggle for async/sync prefetch private final boolean clientScannerAsyncPrefetch; private final long pauseMs; @@ -138,10 +147,17 @@ public class ConnectionConfiguration { this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.metaReadRpcTimeout = conf.getInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, readRpcTimeout); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.scanTimeout = conf.getInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD); + + this.metaScanTimeout = conf.getInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, scanTimeout); + long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE); long pauseMsForServerOverloaded = conf.getLong(HBASE_CLIENT_PAUSE_FOR_SERVER_OVERLOADED, conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs)); @@ -178,8 +194,11 @@ public class ConnectionConfiguration { this.clientScannerAsyncPrefetch = Scan.DEFAULT_HBASE_CLIENT_SCANNER_ASYNC_PREFETCH; this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT; this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.metaReadRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; this.rpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT; + this.scanTimeout = HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD; + this.metaScanTimeout = scanTimeout; this.pauseMs = DEFAULT_HBASE_CLIENT_PAUSE; this.pauseMsForServerOverloaded = DEFAULT_HBASE_CLIENT_PAUSE; } @@ -188,6 +207,10 @@ public class ConnectionConfiguration { return readRpcTimeout; } + public int getMetaReadRpcTimeout() { + return metaReadRpcTimeout; + } + public int getWriteRpcTimeout() { return writeRpcTimeout; } @@ -248,6 +271,14 @@ public class ConnectionConfiguration { return rpcTimeout; } + public int getScanTimeout() { + return scanTimeout; + } + + public int getMetaScanTimeout() { + return metaScanTimeout; + } + public long getPauseMillis() { return pauseMs; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index e5ee051776a..33797e763a2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -934,7 +934,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { s.resetMvccReadPoint(); try (ReversedClientScanner rcs = new ReversedClientScanner(conf, s, TableName.META_TABLE_NAME, this, rpcCallerFactory, - rpcControllerFactory, getMetaLookupPool(), metaReplicaCallTimeoutScanInMicroSecond)) { + rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeout(), + connectionConfig.getMetaScanTimeout(), metaReplicaCallTimeoutScanInMicroSecond)) { boolean tableNotFound = true; for (;;) { Result regionInfoRow = rcs.next(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index a04fd26df65..5cb5f3bc1aa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -114,6 +114,9 @@ public class HTable implements Table { private final int rpcTimeoutMs; // FIXME we should use this for rpc like batch and checkAndXXX private int readRpcTimeoutMs; // timeout for each read rpc request private int writeRpcTimeoutMs; // timeout for each write rpc request + + private final int scanReadRpcTimeout; + private final int scanTimeout; private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final HRegionLocator locator; @@ -187,6 +190,8 @@ public class HTable implements Table { this.rpcTimeoutMs = builder.rpcTimeout; this.readRpcTimeoutMs = builder.readRpcTimeout; this.writeRpcTimeoutMs = builder.writeRpcTimeout; + this.scanReadRpcTimeout = builder.scanReadRpcTimeout; + this.scanTimeout = builder.scanTimeout; this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); @@ -306,24 +311,24 @@ public class HTable implements Table { // it is not supposed to be set by user, clear scan.resetMvccReadPoint(); } - Boolean async = scan.isAsyncPrefetch(); - if (async == null) { - async = connConfiguration.isClientScannerAsyncPrefetch(); - } + final boolean async = scan.isAsyncPrefetch() != null + ? scan.isAsyncPrefetch() + : connConfiguration.isClientScannerAsyncPrefetch(); + final int replicaTimeout = connConfiguration.getReplicaCallTimeoutMicroSecondScan(); if (scan.isReversed()) { - return new ReversedClientScanner(getConfiguration(), scan, getName(), - this.connection, this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ReversedClientScanner(getConfiguration(), scan, getName(), connection, + rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + replicaTimeout); } else { if (async) { - return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), this.connection, - this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ClientAsyncPrefetchScanner(getConfiguration(), scan, getName(), connection, + rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + replicaTimeout); } else { - return new ClientSimpleScanner(getConfiguration(), scan, getName(), this.connection, - this.rpcCallerFactory, this.rpcControllerFactory, - pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan()); + return new ClientSimpleScanner(getConfiguration(), scan, getName(), connection, + rpcCallerFactory, rpcControllerFactory, pool, scanReadRpcTimeout, scanTimeout, + replicaTimeout); } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java index 965b13c2134..0e798c62d20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java @@ -1,4 +1,4 @@ -/** + /** * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -59,16 +59,17 @@ public class ResultBoundedCompletionService<V> { private T result = null; private ExecutionException exeEx = null; private volatile boolean cancelled = false; - private final int callTimeout; + private final int operationTimeout; private final RpcRetryingCaller<T> retryingCaller; private boolean resultObtained = false; private final int replicaId; // replica id - public QueueingFuture(RetryingCallable<T> future, int callTimeout, int id) { + public QueueingFuture(RetryingCallable<T> future, int rpcTimeout, int operationTimeout, + int id) { this.future = future; - this.callTimeout = callTimeout; - this.retryingCaller = retryingCallerFactory.<T>newCaller(); + this.operationTimeout = operationTimeout; + this.retryingCaller = retryingCallerFactory.<T> newCaller(rpcTimeout); this.replicaId = id; } @@ -77,7 +78,7 @@ public class ResultBoundedCompletionService<V> { public void run() { try { if (!cancelled) { - result = this.retryingCaller.callWithRetries(future, callTimeout); + result = this.retryingCaller.callWithRetries(future, operationTimeout); resultObtained = true; } } catch (Throwable t) { @@ -165,10 +166,10 @@ public class ResultBoundedCompletionService<V> { this.completedTasks = new ArrayList<>(maxTasks); } - - public void submit(RetryingCallable<V> task, int callTimeout, int id) { - QueueingFuture<V> newFuture = new QueueingFuture<>(task, callTimeout, id); - executor.execute(TraceUtil.wrap(newFuture, "ResultBoundedCompletionService.submit")); + public void submit(RetryingCallable<V> task, int rpcTimeout, int operationTimeout, int id) { + QueueingFuture<V> newFuture = new QueueingFuture<>(task, rpcTimeout, operationTimeout, id); + // remove trace for runnable because HBASE-25373 and OpenTelemetry do not cover TraceRunnable + executor.execute(newFuture); tasks[id] = newFuture; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java index c0bf4b8d263..b10395f1bff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ReversedClientScanner.java @@ -39,11 +39,11 @@ public class ReversedClientScanner extends ClientScanner { * {@link Scan}'s start row maybe changed. */ public ReversedClientScanner(Configuration conf, Scan scan, TableName tableName, - ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, - RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) - throws IOException { + ClusterConnection connection, RpcRetryingCallerFactory rpcFactory, + RpcControllerFactory controllerFactory, ExecutorService pool, int scanReadRpcTimeout, + int scannerTimeout, int primaryOperationTimeout) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); + scanReadRpcTimeout, scannerTimeout, primaryOperationTimeout); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 260cb8c26f2..674f96f5a9a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -314,7 +314,7 @@ public class RpcRetryingCallerWithReadReplicas { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - cs.submit(callOnReplica, operationTimeout, id); + cs.submit(callOnReplica, rpcTimeout, operationTimeout, id); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java index 636ca374d3b..888e9ccb5e9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java @@ -68,15 +68,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { private final RpcRetryingCaller<Result[]> caller; private final TableName tableName; private Configuration conf; - private int scannerTimeout; + private final int scannerTimeout; + private final int readRpcTimeout; private Set<ScannerCallable> outstandingCallables = new HashSet<>(); private boolean someRPCcancelled = false; //required for testing purposes only private int regionReplication = 0; public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection, - ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, - int retries, int scannerTimeout, int caching, Configuration conf, - RpcRetryingCaller<Result []> caller) { + ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan, + int retries, int readRpcTimeout, int scannerTimeout, int caching, Configuration conf, + RpcRetryingCaller<Result[]> caller) { this.currentScannerCallable = baseCallable; this.cConnection = cConnection; this.pool = pool; @@ -88,6 +89,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { this.retries = retries; this.tableName = tableName; this.conf = conf; + this.readRpcTimeout = readRpcTimeout; this.scannerTimeout = scannerTimeout; this.caller = caller; } @@ -326,7 +328,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs) { RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable); outstandingCallables.add(currentScannerCallable); - cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id); + cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, currentScannerCallable.id); } private void addCallsForOtherReplicas( @@ -340,7 +342,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> { setStartRowForReplicaCallable(s); outstandingCallables.add(s); RetryingRPC retryingOnReplica = new RetryingRPC(s); - cs.submit(retryingOnReplica, scannerTimeout, id); + cs.submit(retryingOnReplica, readRpcTimeout, scannerTimeout, id); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java index fa543c06244..585b859a606 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableBuilderBase.java @@ -35,6 +35,8 @@ abstract class TableBuilderBase implements TableBuilder { protected int readRpcTimeout; protected int writeRpcTimeout; + protected final int scanReadRpcTimeout; + protected int scanTimeout; TableBuilderBase(TableName tableName, ConnectionConfiguration connConf) { if (tableName == null) { @@ -45,6 +47,10 @@ abstract class TableBuilderBase implements TableBuilder { : connConf.getOperationTimeout(); this.rpcTimeout = connConf.getRpcTimeout(); this.readRpcTimeout = connConf.getReadRpcTimeout(); + this.scanReadRpcTimeout = + tableName.isSystemTable() ? connConf.getMetaReadRpcTimeout() : readRpcTimeout; + this.scanTimeout = + tableName.isSystemTable() ? connConf.getMetaScanTimeout() : connConf.getScanTimeout(); this.writeRpcTimeout = connConf.getWriteRpcTimeout(); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java index 244abe01139..4d51f6f789a 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientScanner.java @@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.Type; import org.apache.hadoop.hbase.RegionLocations; @@ -107,7 +108,8 @@ public class TestClientScanner { RpcControllerFactory controllerFactory, ExecutorService pool, int primaryOperationTimeout) throws IOException { super(conf, scan, tableName, connection, rpcFactory, controllerFactory, pool, - primaryOperationTimeout); + HConstants.DEFAULT_HBASE_RPC_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, primaryOperationTimeout); } @Override @@ -495,7 +497,7 @@ public class TestClientScanner { } @Override - public <T> RpcRetryingCaller<T> newCaller() { + public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { return new RpcRetryingCaller<T>() { @Override public void cancel() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java new file mode 100644 index 00000000000..2bff2297ff5 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientScannerTimeouts.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY; +import static org.apache.hadoop.hbase.client.ConnectionConfiguration.HBASE_CLIENT_META_SCANNER_TIMEOUT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestClientScannerTimeouts { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestClientScannerTimeouts.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestClientScannerTimeouts.class); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static AsyncConnection ASYNC_CONN; + private static Connection CONN; + private static final byte[] FAMILY = Bytes.toBytes("testFamily"); + private static final byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static final byte[] VALUE = Bytes.toBytes("testValue"); + + private static final byte[] ROW0 = Bytes.toBytes("row-0"); + private static final byte[] ROW1 = Bytes.toBytes("row-1"); + private static final byte[] ROW2 = Bytes.toBytes("row-2"); + private static final byte[] ROW3 = Bytes.toBytes("row-3"); + private static final int rpcTimeout = 1000; + private static final int scanTimeout = 3 * rpcTimeout; + private static final int metaScanTimeout = 6 * rpcTimeout; + private static final int CLIENT_RETRIES_NUMBER = 3; + + private static TableName tableName; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + // Don't report so often so easier to see other rpcs + conf.setInt("hbase.regionserver.msginterval", 3 * 10000); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, rpcTimeout); + conf.setStrings(HConstants.REGION_SERVER_IMPL, RegionServerWithScanTimeout.class.getName()); + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES_NUMBER); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); + TEST_UTIL.startMiniCluster(1); + + conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, scanTimeout); + conf.setInt(HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY, metaScanTimeout); + conf.setInt(HBASE_CLIENT_META_SCANNER_TIMEOUT, metaScanTimeout); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(conf).get(); + CONN = ConnectionFactory.createConnection(conf); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + CONN.close(); + ASYNC_CONN.close(); + TEST_UTIL.shutdownMiniCluster(); + } + + public void setup(boolean isSystemTable) throws IOException { + RSRpcServicesWithScanTimeout.reset(); + + String nameAsString = name.getMethodName(); + if (isSystemTable) { + nameAsString = NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR + ":" + nameAsString; + } + tableName = TableName.valueOf(nameAsString); + TEST_UTIL.createTable(tableName, FAMILY); + + Table table = CONN.getTable(tableName); + putToTable(table, ROW0); + putToTable(table, ROW1); + putToTable(table, ROW2); + putToTable(table, ROW3); + LOG.info("Wrote our four values"); + + table.getRegionLocator().getAllRegionLocations(); + + // reset again incase the creation/population caused anything to trigger + RSRpcServicesWithScanTimeout.reset(); + } + + private void expectRow(byte[] expected, Result result) { + assertTrue("Expected row: " + Bytes.toString(expected), + Bytes.equals(expected, result.getRow())); + } + + private void expectNumTries(int expected) { + assertEquals( + "Expected tryNumber=" + expected + ", actual=" + RSRpcServicesWithScanTimeout.tryNumber, + expected, RSRpcServicesWithScanTimeout.tryNumber); + // reset for next + RSRpcServicesWithScanTimeout.tryNumber = 0; + } + + /** + * verify that we don't miss any data when encountering an OutOfOrderScannerNextException. + * Typically, the only way to naturally trigger this is if a client-side timeout causes an + * erroneous next() call. This is relatively hard to do these days because the server attempts to + * always return before the timeout. In this test we force the server to throw this exception, so + * that we can test the retry logic appropriately. + */ + @Test + public void testRetryOutOfOrderScannerNextException() throws IOException { + expectRetryOutOfOrderScannerNext(() -> getScanner(CONN)); + } + + /** + * AsyncTable version of above + */ + @Test + public void testRetryOutOfOrderScannerNextExceptionAsync() throws IOException { + expectRetryOutOfOrderScannerNext(this::getAsyncScanner); + } + + /** + * verify that we honor the {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for normal scans. Use a + * special connection which has retries disabled, because otherwise the scanner will retry the + * timed out next() call and mess up the test. + */ + @Test + public void testNormalScanTimeoutOnNext() throws IOException { + setup(false); + // Unlike AsyncTable, Table's ResultScanner.next() call uses rpcTimeout and + // will retry until scannerTimeout. This makes it more difficult to test the timeouts + // of normal next() calls. So we use a separate connection here which has retries disabled. + Configuration confNoRetries = new Configuration(CONN.getConfiguration()); + confNoRetries.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 0); + try (Connection conn = ConnectionFactory.createConnection(confNoRetries)) { + expectTimeoutOnNext(rpcTimeout, () -> getScanner(conn)); + } + } + + /** + * AsyncTable version of above + */ + @Test + public void testNormalScanTimeoutOnNextAsync() throws IOException { + setup(false); + expectTimeoutOnNext(scanTimeout, this::getAsyncScanner); + } + + /** + * verify that we honor {@link HConstants#HBASE_RPC_READ_TIMEOUT_KEY} for openScanner() calls for + * meta scans + */ + @Test + public void testNormalScanTimeoutOnOpenScanner() throws IOException { + setup(false); + expectTimeoutOnOpenScanner(rpcTimeout, this::getScanner); + } + + /** + * AsyncTable version of above + */ + @Test + public void testNormalScanTimeoutOnOpenScannerAsync() throws IOException { + setup(false); + expectTimeoutOnOpenScanner(rpcTimeout, this::getAsyncScanner); + } + + /** + * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_SCANNER_TIMEOUT} for + * next() calls in meta scans + */ + @Test + public void testMetaScanTimeoutOnNext() throws IOException { + setup(true); + expectTimeoutOnNext(metaScanTimeout, this::getScanner); + } + + /** + * AsyncTable version of above + */ + @Test + public void testMetaScanTimeoutOnNextAsync() throws IOException { + setup(true); + expectTimeoutOnNext(metaScanTimeout, this::getAsyncScanner); + } + + /** + * verify that we honor {@link ConnectionConfiguration#HBASE_CLIENT_META_READ_RPC_TIMEOUT_KEY} for + * openScanner() calls for meta scans + */ + @Test + public void testMetaScanTimeoutOnOpenScanner() throws IOException { + setup(true); + expectTimeoutOnOpenScanner(metaScanTimeout, this::getScanner); + } + + /** + * AsyncTable version of above + */ + @Test + public void testMetaScanTimeoutOnOpenScannerAsync() throws IOException { + setup(true); + expectTimeoutOnOpenScanner(metaScanTimeout, this::getAsyncScanner); + } + + private void expectRetryOutOfOrderScannerNext(Supplier<ResultScanner> scannerSupplier) + throws IOException { + setup(false); + RSRpcServicesWithScanTimeout.seqNoToThrowOn = 1; + + LOG.info( + "Opening scanner, expecting no errors from first next() call from openScanner response"); + ResultScanner scanner = scannerSupplier.get(); + Result result = scanner.next(); + expectRow(ROW0, result); + expectNumTries(0); + + LOG.info("Making first next() RPC, expecting no errors for seqNo 0"); + result = scanner.next(); + expectRow(ROW1, result); + expectNumTries(0); + + LOG.info( + "Making second next() RPC, expecting OutOfOrderScannerNextException and appropriate retry"); + result = scanner.next(); + expectRow(ROW2, result); + expectNumTries(1); + + // reset so no errors. since last call restarted the scan and following + // call would otherwise fail + RSRpcServicesWithScanTimeout.seqNoToThrowOn = -1; + + LOG.info("Finishing scan, expecting no errors"); + result = scanner.next(); + expectRow(ROW3, result); + scanner.close(); + + LOG.info("Testing always throw exception"); + byte[][] expectedResults = new byte[][] { ROW0, ROW1, ROW2, ROW3 }; + int i = 0; + + // test the case that RPC always throws + scanner = scannerSupplier.get(); + RSRpcServicesWithScanTimeout.throwAlways = true; + + while (true) { + LOG.info("Calling scanner.next()"); + result = scanner.next(); + if (result == null) { + break; + } else { + byte[] expectedResult = expectedResults[i++]; + expectRow(expectedResult, result); + } + } + + // ensure we verified all rows. this along with the expectRow check above + // proves that we didn't miss any rows. + assertEquals("Expected to exhaust expectedResults array length=" + expectedResults.length + + ", actual index=" + i, expectedResults.length, i); + + // expect all but the first row (which came from initial openScanner) to have thrown an error + expectNumTries(expectedResults.length - 1); + + } + + private void expectTimeoutOnNext(int timeout, Supplier<ResultScanner> scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.seqNoToSleepOn = 1; + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + + LOG.info( + "Opening scanner, expecting no timeouts from first next() call from openScanner response"); + ResultScanner scanner = scannerSupplier.get(); + Result result = scanner.next(); + expectRow(ROW0, result); + + LOG.info("Making first next() RPC, expecting no timeout for seqNo 0"); + result = scanner.next(); + expectRow(ROW1, result); + + LOG.info("Making second next() RPC, expecting timeout"); + long start = System.nanoTime(); + try { + scanner.next(); + fail("Expected CallTimeoutException"); + } catch (RetriesExhaustedException e) { + assertTrue("Expected CallTimeoutException", e.getCause() instanceof CallTimeoutException + || e.getCause() instanceof SocketTimeoutException); + } + expectTimeout(start, timeout); + } + + private void expectTimeoutOnOpenScanner(int timeout, Supplier<ResultScanner> scannerSupplier) + throws IOException { + RSRpcServicesWithScanTimeout.setSleepForTimeout(timeout); + RSRpcServicesWithScanTimeout.sleepOnOpen = true; + LOG.info("Opening scanner, expecting timeout from first next() call from openScanner response"); + long start = System.nanoTime(); + try { + scannerSupplier.get().next(); + fail("Expected SocketTimeoutException or CallTimeoutException"); + } catch (RetriesExhaustedException e) { + LOG.info("Got error", e); + assertTrue("Expected SocketTimeoutException or CallTimeoutException, but was " + e.getCause(), + e.getCause() instanceof CallTimeoutException + || e.getCause() instanceof SocketTimeoutException); + } + expectTimeout(start, timeout); + } + + private void expectTimeout(long start, int timeout) { + long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + LOG.info("Expected duration >= {}, and got {}", timeout, duration); + assertTrue("Expected duration >= " + timeout + ", but was " + duration, duration >= timeout); + } + + private ResultScanner getScanner() { + return getScanner(CONN); + } + + private ResultScanner getScanner(Connection conn) { + Scan scan = new Scan(); + scan.setCaching(1); + try { + return conn.getTable(tableName).getScanner(scan); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private ResultScanner getAsyncScanner() { + Scan scan = new Scan(); + scan.setCaching(1); + return ASYNC_CONN.getTable(tableName).getScanner(scan); + } + + private void putToTable(Table ht, byte[] rowkey) throws IOException { + Put put = new Put(rowkey); + put.addColumn(FAMILY, QUALIFIER, VALUE); + ht.put(put); + } + + private static class RegionServerWithScanTimeout + extends MiniHBaseCluster.MiniHBaseClusterRegionServer { + public RegionServerWithScanTimeout(Configuration conf) + throws IOException, InterruptedException { + super(conf); + } + + @Override + protected RSRpcServices createRpcServices() throws IOException { + return new RSRpcServicesWithScanTimeout(this); + } + } + + private static class RSRpcServicesWithScanTimeout extends RSRpcServices { + private long tableScannerId; + + private static long seqNoToThrowOn = -1; + private static boolean throwAlways = false; + private static boolean threw; + + private static long seqNoToSleepOn = -1; + private static boolean sleepOnOpen = false; + private static volatile boolean slept; + private static int tryNumber = 0; + + private static int sleepTime = rpcTimeout + 500; + + public static void setSleepForTimeout(int timeout) { + sleepTime = timeout + 500; + } + + public static void reset() { + setSleepForTimeout(scanTimeout); + + seqNoToSleepOn = -1; + seqNoToThrowOn = -1; + throwAlways = false; + threw = false; + sleepOnOpen = false; + slept = false; + tryNumber = 0; + } + + public RSRpcServicesWithScanTimeout(HRegionServer rs) throws IOException { + super(rs); + } + + @Override + public ScanResponse scan(final RpcController controller, final ScanRequest request) + throws ServiceException { + if (request.hasScannerId()) { + LOG.info("Got request {}", request); + ScanResponse scanResponse = super.scan(controller, request); + if (tableScannerId != request.getScannerId() || request.getCloseScanner()) { + return scanResponse; + } + + if ( + throwAlways + || (!threw && request.hasNextCallSeq() && seqNoToThrowOn == request.getNextCallSeq()) + ) { + threw = true; + tryNumber++; + LOG.info("THROWING exception, tryNumber={}, tableScannerId={}", tryNumber, + tableScannerId); + throw new ServiceException(new OutOfOrderScannerNextException()); + } + + if (!slept && request.hasNextCallSeq() && seqNoToSleepOn == request.getNextCallSeq()) { + try { + LOG.info("SLEEPING " + sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + slept = true; + tryNumber++; + } + return scanResponse; + } else { + ScanResponse scanRes = super.scan(controller, request); + String regionName = Bytes.toString(request.getRegion().getValue().toByteArray()); + if (!regionName.contains(TableName.META_TABLE_NAME.getNameAsString())) { + tableScannerId = scanRes.getScannerId(); + if (sleepOnOpen) { + try { + LOG.info("openScanner SLEEPING " + sleepTime); + Thread.sleep(sleepTime); + } catch (InterruptedException e) { + } + } + } + return scanRes; + } + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java index 60dfac56946..02dbb76568c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java @@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Result; @@ -91,6 +93,7 @@ public class TestScannerHeartbeatMessages { private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private static Table TABLE = null; + private static Connection CONN = null; /** * Table configuration @@ -134,6 +137,7 @@ public class TestScannerHeartbeatMessages { conf.setStrings(HConstants.REGION_IMPL, HeartbeatHRegion.class.getName()); conf.setStrings(HConstants.REGION_SERVER_IMPL, HeartbeatHRegionServer.class.getName()); + // setting these here for usage on the server side. will override for client side below conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SERVER_TIMEOUT); conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1); @@ -142,6 +146,10 @@ public class TestScannerHeartbeatMessages { conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); TEST_UTIL.startMiniCluster(1); + // set client timeout for client side, we want it to be less than server side. + Configuration clientConf = new Configuration(conf); + clientConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); + CONN = ConnectionFactory.createConnection(clientConf); TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); } @@ -180,11 +188,11 @@ public class TestScannerHeartbeatMessages { } static Table createTestTable(TableName name, byte[][] rows, byte[][] families, - byte[][] qualifiers, byte[] cellValue) throws IOException { - Table ht = TEST_UTIL.createTable(name, families); + byte[][] qualifiers, byte[] cellValue) throws IOException { + TEST_UTIL.createTable(name, families); + Table ht = CONN.getTable(name); List<Put> puts = createPuts(rows, families, qualifiers, cellValue); ht.put(puts); - ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT); return ht; } @@ -212,6 +220,7 @@ public class TestScannerHeartbeatMessages { @AfterClass public static void tearDownAfterClass() throws Exception { + CONN.close(); TEST_UTIL.shutdownMiniCluster(); }