Repository: hbase Updated Branches: refs/heads/master f11aa4542 -> 88ff71b91
HBASE-16664 Timeout logic in AsyncProcess is broken Signed-off-by: chenheng <chenh...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/88ff71b9 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/88ff71b9 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/88ff71b9 Branch: refs/heads/master Commit: 88ff71b91b086984fdc5b8707d134a1d475e5103 Parents: f11aa45 Author: Phil Yang <ud1...@gmail.com> Authored: Sun Oct 9 15:25:11 2016 +0800 Committer: chenheng <chenh...@apache.org> Committed: Thu Oct 13 16:15:43 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 38 +++-- .../hbase/client/AsyncRequestFutureImpl.java | 64 +++---- .../hadoop/hbase/client/BufferedMutator.java | 10 ++ .../hbase/client/BufferedMutatorImpl.java | 20 ++- .../client/CancellableRegionServerCallable.java | 22 ++- .../hbase/client/ConnectionImplementation.java | 8 +- .../org/apache/hadoop/hbase/client/HTable.java | 48 +++--- .../hadoop/hbase/client/HTableMultiplexer.java | 6 +- .../hbase/client/MultiServerCallable.java | 15 +- .../client/NoncedRegionServerCallable.java | 2 +- .../hbase/client/RetryingTimeTracker.java | 3 +- .../RpcRetryingCallerWithReadReplicas.java | 14 +- .../hadoop/hbase/client/TestAsyncProcess.java | 39 +++-- .../hbase/client/HConnectionTestingUtility.java | 4 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 167 +++++++++++++++++-- 15 files changed, 338 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index f2d9546..abefc46 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; @@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import com.google.common.annotations.VisibleForTesting; - /** * This class allows a continuous flow of requests. It's written to be compatible with a * synchronous caller such as HTable. @@ -212,7 +212,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; /** Whether to log details for batch errors */ protected final boolean logBatchErrorDetails; @@ -220,7 +221,7 @@ class AsyncProcess { public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory, int rpcTimeout) { + RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } @@ -236,7 +237,8 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.timeout = rpcTimeout; + this.rpcTimeout = rpcTimeout; + this.operationTimeout = operationTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -434,7 +436,7 @@ class AsyncProcess { List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -448,6 +450,14 @@ class AsyncProcess { return ars; } + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + } + /** * Helper that is used when grouping the actions per region server. * @@ -473,7 +483,7 @@ class AsyncProcess { public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, timeout); + return submitAll(pool, tableName, rows, callback, results, null, -1); } /** * Submit immediately the list of rows, whatever the server status. Kept for backward @@ -484,10 +494,11 @@ class AsyncProcess { * @param rows the list of rows. * @param callback the callback. * @param results Optional array to return the results thru; backward compat. + * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting. */ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results, - CancellableRegionServerCallable callable, int curTimeout) { + CancellableRegionServerCallable callable, int rpcTimeout) { List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -507,7 +518,7 @@ class AsyncProcess { } AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, curTimeout); + callable, rpcTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -520,10 +531,11 @@ class AsyncProcess { protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<CResult> callback, Object[] results, boolean needResults, - CancellableRegionServerCallable callable, int curTimeout) { + CancellableRegionServerCallable callable, int rpcTimeout) { return new AsyncRequestFutureImpl<CResult>( tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout, this); + results, callback, callable, operationTimeout, + rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this); } /** Wait until the async does not have more than max tasks in progress. */ @@ -664,8 +676,8 @@ class AsyncProcess { */ @VisibleForTesting protected RpcRetryingCaller<AbstractResponse> createCaller( - CancellableRegionServerCallable callable) { - return rpcCallerFactory.<AbstractResponse> newCaller(); + CancellableRegionServerCallable callable, int rpcTimeout) { + return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout); } http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 3894d58..a2642f3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -20,6 +20,24 @@ package org.apache.hadoop.hbase.client; import com.google.common.annotations.VisibleForTesting; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DoNotRetryIOException; @@ -39,23 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.htrace.Trace; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; - /** * The context, and return value, for a single submit/submitAll call. * Note on how this class (one AP submit) works. Initially, all requests are split into groups @@ -70,6 +71,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { private static final Log LOG = LogFactory.getLog(AsyncRequestFutureImpl.class); + private RetryingTimeTracker tracker; + /** * Runnable (that can be submitted to thread pool) that waits for when it's time * to issue replica calls, finds region replicas, groups the requests by replica and @@ -219,12 +222,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { if (callable == null) { callable = createCallable(server, tableName, multiAction); } - RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable); + RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout); try { if (callsInProgress != null) { callsInProgress.add(callable); } - res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + res = caller.callWithoutRetries(callable, operationTimeout); if (res == null) { // Cancelled return; @@ -297,7 +300,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { private final boolean hasAnyReplicaGets; private final long nonceGroup; private CancellableRegionServerCallable currentCallable; - private int currentCallTotalTimeout; + private int operationTimeout; + private int rpcTimeout; private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); protected AsyncProcess asyncProcess; @@ -337,10 +341,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback<CResult> callback, - CancellableRegionServerCallable callable, int timeout, - AsyncProcess asyncProcess) { + ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback, + CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout, + AsyncProcess asyncProcess) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -410,9 +413,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { this.errorsByServer = createServerErrorTracker(); this.errors = (asyncProcess.globalErrors != null) ? asyncProcess.globalErrors : new BatchErrors(); + this.operationTimeout = operationTimeout; + this.rpcTimeout = rpcTimeout; this.currentCallable = callable; - this.currentCallTotalTimeout = timeout; - + if (callable == null) { + tracker = new RetryingTimeTracker().start(); + } } @VisibleForTesting @@ -1281,9 +1287,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { /** * Create a callable. Isolated to be easily overridden in the tests. */ - private MultiServerCallable<Row> createCallable(final ServerName server, - TableName tableName, final MultiAction<Row> multi) { + private MultiServerCallable<Row> createCallable(final ServerName server, TableName tableName, + final MultiAction<Row> multi) { return new MultiServerCallable<Row>(asyncProcess.connection, tableName, server, - multi, asyncProcess.rpcFactory.newController()); + multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java index 5dc7fc3..fcc9af7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java @@ -120,6 +120,16 @@ public interface BufferedMutator extends Closeable { long getWriteBufferSize(); /** + * Set rpc timeout for this mutator instance + */ + void setRpcTimeout(int timeout); + + /** + * Set operation timeout for this mutator instance + */ + void setOperationTimeout(int timeout); + + /** * Listens for asynchronous exceptions on a {@link BufferedMutator}. */ @InterfaceAudience.Public http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 2d4c8b3..f7eb09d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -82,6 +82,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private boolean closed = false; private final ExecutorService pool; private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor + private int operationTimeout; @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -107,9 +108,12 @@ public class BufferedMutatorImpl implements BufferedMutator { this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - + this.operationTimeout = conn.getConfiguration().getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, + writeRpcTimeout, operationTimeout); } @Override @@ -282,6 +286,18 @@ public class BufferedMutatorImpl implements BufferedMutator { return this.writeBufferSize; } + @Override + public void setRpcTimeout(int timeout) { + this.writeRpcTimeout = timeout; + ap.setRpcTimeout(timeout); + } + + @Override + public void setOperationTimeout(int timeout) { + this.operationTimeout = timeout; + ap.setOperationTimeout(operationTimeout); + } + private class QueueRowAccess implements RowAccess<Row> { private int remainder = undealtMutationCount.getAndSet(0); http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java index 69f5b55..a0ff900 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java @@ -30,15 +30,20 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; * This class is used to unify HTable calls with AsyncProcess Framework. HTable can use * AsyncProcess directly though this class. Also adds global timeout tracking on top of * RegionServerCallable and implements Cancellable. + * Global timeout tracking conflicts with logic in RpcRetryingCallerImpl's callWithRetries. So you + * can only use this callable in AsyncProcess which only uses callWithoutRetries and retries in its + * own implementation. */ @InterfaceAudience.Private abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<T> implements Cancellable { - private final RetryingTimeTracker tracker = new RetryingTimeTracker(); - + private final RetryingTimeTracker tracker; + private final int rpcTimeout; CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row, - RpcController rpcController) { + RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) { super(connection, tableName, row, rpcController); + this.rpcTimeout = rpcTimeout; + this.tracker = tracker; } /* Override so can mess with the callTimeout. @@ -46,7 +51,7 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable< * @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int) */ @Override - public T call(int callTimeout) throws IOException { + public T call(int operationTimeout) throws IOException { if (isCancelled()) return null; if (Thread.interrupted()) { throw new InterruptedIOException(); @@ -54,11 +59,12 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable< // It is expected (it seems) that tracker.start can be called multiple times (on each trip // through the call when retrying). Also, we can call start and no need of a stop. this.tracker.start(); - int remainingTime = tracker.getRemainingTime(callTimeout); - if (remainingTime == 0) { - throw new DoNotRetryIOException("Timeout for mutate row"); + int remainingTime = tracker.getRemainingTime(operationTimeout); + if (remainingTime <= 1) { + // "1" is a special return value in RetryingTimeTracker, see its implementation. + throw new DoNotRetryIOException("Operation rpcTimeout"); } - return super.call(remainingTime); + return super.call(Math.min(rpcTimeout, remainingTime)); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8db9dbf..9cf63dc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1831,8 +1831,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, + rpcTimeout, operationTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 84f8024..2802a2c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -441,7 +441,7 @@ public class HTable implements Table { RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas( rpcControllerFactory, tableName, this.connection, get, pool, connConfiguration.getRetriesNumber(), - operationTimeout, + operationTimeout, readRpcTimeout, connConfiguration.getPrimaryCallTimeoutMicroSecond()); return callable.call(operationTimeout); } @@ -479,15 +479,10 @@ public class HTable implements Table { batch(actions, results, -1); } - public void batch(final List<? extends Row> actions, final Object[] results, int timeout) + public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout) throws InterruptedException, IOException { - AsyncRequestFuture ars = null; - if (timeout != -1) { - ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout); - } else { - // use default timeout in AP - ars = multiAp.submitAll(pool, tableName, actions, null, results); - } + AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, + rpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -523,7 +518,8 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable<SingleResponse> callable = new CancellableRegionServerCallable<SingleResponse>( - connection, getName(), delete.getRow(), this.rpcControllerFactory.newController()) { + connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(), + writeRpcTimeout, new RetryingTimeTracker().start()) { @Override protected SingleResponse rpcCall() throws Exception { MutateRequest request = RequestConverter.buildMutateRequest( @@ -535,7 +531,7 @@ public class HTable implements Table { List<Row> rows = new ArrayList<Row>(); rows.add(delete); AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, null, callable, operationTimeout); + null, null, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -593,7 +589,7 @@ public class HTable implements Table { public void mutateRow(final RowMutations rm) throws IOException { CancellableRegionServerCallable<MultiResponse> callable = new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){ @Override protected MultiResponse rpcCall() throws Exception { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( @@ -614,7 +610,7 @@ public class HTable implements Table { } }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, operationTimeout); + null, null, callable, writeRpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -798,7 +794,8 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable<SingleResponse> callable = new CancellableRegionServerCallable<SingleResponse>( - this.connection, getName(), row, this.rpcControllerFactory.newController()) { + this.connection, getName(), row, this.rpcControllerFactory.newController(), + writeRpcTimeout, new RetryingTimeTracker().start()) { @Override protected SingleResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -814,7 +811,7 @@ public class HTable implements Table { Object[] results = new Object[1]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows, - null, results, callable, operationTimeout); + null, results, callable, -1); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -831,7 +828,7 @@ public class HTable implements Table { throws IOException { CancellableRegionServerCallable<MultiResponse> callable = new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(), - rpcControllerFactory.newController()) { + rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) { @Override protected MultiResponse rpcCall() throws Exception { CompareType compareType = CompareType.valueOf(compareOp.name()); @@ -858,7 +855,7 @@ public class HTable implements Table { * */ Object[] results = new Object[rm.getMutations().size()]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, operationTimeout); + null, results, callable, -1); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1117,6 +1114,10 @@ public class HTable implements Table { @Override public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; + if (mutator != null) { + mutator.setOperationTimeout(operationTimeout); + } + multiAp.setOperationTimeout(operationTimeout); } @Override @@ -1133,8 +1134,8 @@ public class HTable implements Table { @Override @Deprecated public void setRpcTimeout(int rpcTimeout) { - this.readRpcTimeout = rpcTimeout; - this.writeRpcTimeout = rpcTimeout; + setReadRpcTimeout(rpcTimeout); + setWriteRpcTimeout(rpcTimeout); } @Override @@ -1145,6 +1146,10 @@ public class HTable implements Table { @Override public void setWriteRpcTimeout(int writeRpcTimeout) { this.writeRpcTimeout = writeRpcTimeout; + if (mutator != null) { + mutator.setRpcTimeout(writeRpcTimeout); + } + multiAp.setRpcTimeout(writeRpcTimeout); } @Override @@ -1229,7 +1234,8 @@ public class HTable implements Table { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration), readRpcTimeout); + true, RpcControllerFactory.instantiate(configuration), readRpcTimeout, + operationTimeout); AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs, new Callback<ClientProtos.CoprocessorServiceResult>() { @@ -1281,6 +1287,8 @@ public class HTable implements Table { .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); } + mutator.setRpcTimeout(writeRpcTimeout); + mutator.setOperationTimeout(operationTimeout); return mutator; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index 2c1a61e..e8379ef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -442,6 +442,7 @@ public class HTableMultiplexer { private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor + private final int operationTimeout; public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, @@ -454,7 +455,10 @@ public class HTableMultiplexer { this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, + writeRpcTimeout, operationTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index 6067ef0..7d50a27 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -50,12 +50,13 @@ import com.google.common.annotations.VisibleForTesting; */ @InterfaceAudience.Private class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> { - private final MultiAction<R> multiAction; - private final boolean cellBlock; + private MultiAction<R> multiAction; + private boolean cellBlock; MultiServerCallable(final ClusterConnection connection, final TableName tableName, - final ServerName location, final MultiAction<R> multi, RpcController rpcController) { - super(connection, tableName, null, rpcController); + final ServerName location, final MultiAction<R> multi, RpcController rpcController, + int rpcTimeout, RetryingTimeTracker tracker) { + super(connection, tableName, null, rpcController, rpcTimeout, tracker); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. // Using region info from parent HRegionLocation would be a mistake for this class; so @@ -64,6 +65,12 @@ class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiRespon this.cellBlock = isCellBlock(); } + public void reset(ServerName location, MultiAction<R> multiAction) { + this.location = new HRegionLocation(null, location); + this.multiAction = multiAction; + this.cellBlock = isCellBlock(); + } + @Override protected HRegionLocation getLocation() { throw new RuntimeException("Cannot get region location for multi-region request"); http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java index aff0205..52ed263 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController; * @param <T> the class that the ServerCallable handles */ @InterfaceAudience.Private -public abstract class NoncedRegionServerCallable<T> extends CancellableRegionServerCallable<T> { +public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallable<T> { private final long nonce; /** http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java index b9438e6..e804e92 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -24,10 +24,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; class RetryingTimeTracker { private long globalStartTime = -1; - public void start() { + public RetryingTimeTracker start() { if (this.globalStartTime < 0) { this.globalStartTime = EnvironmentEdgeManager.currentTime(); } + return this; } public int getRemainingTime(int callTimeout) { http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 04553d2..a290c78 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -58,7 +58,8 @@ public class RpcRetryingCallerWithReadReplicas { protected final Get get; protected final TableName tableName; protected final int timeBeforeReplicas; - private final int callTimeout; + private final int operationTimeout; + private final int rpcTimeout; private final int retries; private final RpcControllerFactory rpcControllerFactory; private final RpcRetryingCallerFactory rpcRetryingCallerFactory; @@ -66,7 +67,7 @@ public class RpcRetryingCallerWithReadReplicas { public RpcRetryingCallerWithReadReplicas( RpcControllerFactory rpcControllerFactory, TableName tableName, ClusterConnection cConnection, final Get get, - ExecutorService pool, int retries, int callTimeout, + ExecutorService pool, int retries, int operationTimeout, int rpcTimeout, int timeBeforeReplicas) { this.rpcControllerFactory = rpcControllerFactory; this.tableName = tableName; @@ -75,7 +76,8 @@ public class RpcRetryingCallerWithReadReplicas { this.get = get; this.pool = pool; this.retries = retries; - this.callTimeout = callTimeout; + this.operationTimeout = operationTimeout; + this.rpcTimeout = rpcTimeout; this.timeBeforeReplicas = timeBeforeReplicas; this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf); } @@ -91,7 +93,7 @@ public class RpcRetryingCallerWithReadReplicas { public ReplicaRegionServerCallable(int id, HRegionLocation location) { super(RpcRetryingCallerWithReadReplicas.this.cConnection, RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(), - rpcControllerFactory.newController()); + rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker()); this.id = id; this.location = location; } @@ -133,7 +135,7 @@ public class RpcRetryingCallerWithReadReplicas { ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get); HBaseRpcController hrc = (HBaseRpcController)getRpcController(); hrc.reset(); - hrc.setCallTimeout(callTimeout); + hrc.setCallTimeout(rpcTimeout); hrc.setPriority(tableName); ClientProtos.GetResponse response = getStub().get(hrc, request); if (response == null) { @@ -258,7 +260,7 @@ public class RpcRetryingCallerWithReadReplicas { for (int id = min; id <= max; id++) { HRegionLocation hrl = rl.getRegionLocation(id); ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl); - cs.submit(callOnReplica, callTimeout, id); + cs.submit(callOnReplica, operationTimeout, id); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 0703e51..ed521a3 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -152,7 +152,10 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>(); public AtomicInteger callsCt = new AtomicInteger(); - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); private long previousTimeout = -1; @Override protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, @@ -162,7 +165,7 @@ public class TestAsyncProcess { // Test HTable has tableName of null, so pass DUMMY_TABLE AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>( DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout, this); + results, callback, callable, operationTimeout, rpcTimeout, this); allReqs.add(r); return r; } @@ -174,14 +177,16 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout, + operationTimeout); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + rpcTimeout, operationTimeout); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -193,7 +198,8 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + rpcTimeout, operationTimeout); } @Override @@ -213,7 +219,7 @@ public class TestAsyncProcess { } @Override protected RpcRetryingCaller<AbstractResponse> createCaller( - CancellableRegionServerCallable callable) { + CancellableRegionServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( @@ -254,12 +260,11 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { public MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, - ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback callback, - CancellableRegionServerCallable callable, int timeout, - AsyncProcess asyncProcess) { + ExecutorService pool, boolean needResults, Object[] results, + Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout, + int rpcTimeout, AsyncProcess asyncProcess) { super(tableName, actions, nonceGroup, pool, needResults, - results, callback, callable, timeout, asyncProcess); + results, callback, callable, operationTimeout, rpcTimeout, asyncProcess); } @Override @@ -299,7 +304,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller<AbstractResponse> createCaller( - CancellableRegionServerCallable callable) { + CancellableRegionServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); return new CallerWithFailure(ioe); } @@ -351,7 +356,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller<AbstractResponse> createCaller( - CancellableRegionServerCallable payloadCallable) { + CancellableRegionServerCallable payloadCallable, int rpcTimeout) { MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @@ -1638,12 +1643,14 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { - private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); - + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf), rpcTimeout); + conf), rpcTimeout, operationTimeout); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 658fa96..ee89609 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -128,7 +128,9 @@ public class HConnectionTestingUtility { Mockito.when(c.getAsyncProcess()).thenReturn( new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT))); + HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT))); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index f9ebc47..3416e54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -135,12 +135,42 @@ public class TestHCM { @Override public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, - final Get get, final List<Cell> results) throws IOException { + final Get get, final List<Cell> results) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(sleepTime.get()); - if (ct.incrementAndGet() == 1){ + if (ct.incrementAndGet() == 1) { throw new IOException("first call I fail"); } } + + @Override + public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, + final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, + final Increment increment) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + return super.preIncrement(e, increment); + } + } public static class SleepCoprocessor extends BaseRegionObserver { @@ -156,16 +186,20 @@ public class TestHCM { final Put put, final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(SLEEP_TIME); } - } - public static class SleepWriteCoprocessor extends BaseRegionObserver { - public static final int SLEEP_TIME = 5000; @Override public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, final Increment increment) throws IOException { Threads.sleep(SLEEP_TIME); return super.preIncrement(e, increment); } + + @Override + public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { @@ -364,11 +398,12 @@ public class TestHCM { * timeouted when the server answers. */ @Test - public void testOperationTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); + public void testGetOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); - Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}); + Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -392,6 +427,64 @@ public class TestHCM { } @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test + public void testDeleteOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.delete(new Delete(FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.delete(new Delete(FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); @@ -419,14 +512,14 @@ public class TestHCM { } @Test - public void testWriteRpcTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout"); - hdt.addCoprocessor(SleepWriteCoprocessor.class.getName()); + public void testIncrementRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { - t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2); - t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100); + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); Increment i = new Increment(FAM_NAM); i.addColumn(FAM_NAM, FAM_NAM, 1); t.increment(i); @@ -436,7 +529,7 @@ public class TestHCM { } // Again, with configuration based override - c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2); + c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); try (Connection conn = ConnectionFactory.createConnection(c)) { try (Table t = conn.getTable(hdt.getTableName())) { Increment i = new Increment(FAM_NAM); @@ -450,8 +543,46 @@ public class TestHCM { } @Test - public void testReadRpcTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout"); + public void testDeleteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Delete d = new Delete(FAM_NAM); + d.addColumn(FAM_NAM, FAM_NAM, 1); + t.delete(d); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Put p = new Put(FAM_NAM); + p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); + t.put(p); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testGetRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); @@ -502,6 +633,7 @@ public class TestHCM { TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close(); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + SleepAndFailFirstTime.ct.set(0); c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000); c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000); @@ -1013,8 +1145,7 @@ public class TestHCM { curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); table.close(); connection.close(); }