Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 3737c4696 -> d316bf7c4


HBASE-16664 Timeout logic in AsyncProcess is broken

Signed-off-by: chenheng <chenh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d316bf7c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d316bf7c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d316bf7c

Branch: refs/heads/branch-1.3
Commit: d316bf7c4b3fd4de7a108c4e025c8ddb8dc0a0b8
Parents: 3737c46
Author: Phil Yang <ud1...@gmail.com>
Authored: Tue Oct 11 17:12:54 2016 +0800
Committer: chenheng <chenh...@apache.org>
Committed: Thu Oct 13 19:52:02 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  75 +++++---
 .../hbase/client/BufferedMutatorImpl.java       |  21 ++-
 .../hadoop/hbase/client/ConnectionManager.java  |   3 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |  22 ++-
 .../hadoop/hbase/client/HTableMultiplexer.java  |   5 +-
 .../hbase/client/MultiServerCallable.java       |  15 +-
 .../hbase/client/RetryingTimeTracker.java       |   3 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  22 ++-
 .../hbase/client/HConnectionTestingUtility.java |   5 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java | 172 +++++++++++++++++--
 10 files changed, 276 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index cdcb1b2..32de1e3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -232,7 +232,8 @@ class AsyncProcess {
   protected final long pause;
   protected int numTries;
   protected int serverTrackerTimeout;
-  protected int timeout;
+  protected int rpcTimeout;
+  protected int operationTimeout;
   protected long primaryCallTimeoutMicroseconds;
   // End configuration settings.
 
@@ -275,7 +276,8 @@ class AsyncProcess {
   }
 
   public AsyncProcess(ClusterConnection hc, Configuration conf, 
ExecutorService pool,
-      RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, 
RpcControllerFactory rpcFactory) {
+      RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, 
RpcControllerFactory rpcFactory,
+      int rpcTimeout) {
     if (hc == null) {
       throw new IllegalArgumentException("HConnection cannot be null.");
     }
@@ -290,8 +292,9 @@ class AsyncProcess {
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    this.rpcTimeout = rpcTimeout;
+    this.operationTimeout = 
conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     this.primaryCallTimeoutMicroseconds = 
conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
 
     this.maxTotalConcurrentTasks = 
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
@@ -336,6 +339,14 @@ class AsyncProcess {
           DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
   }
 
+  public void setRpcTimeout(int rpcTimeout) {
+    this.rpcTimeout = rpcTimeout;
+  }
+
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
   /**
    * @return pool if non null, otherwise returns this.pool if non null, 
otherwise throws
    *         RuntimeException
@@ -561,12 +572,12 @@ class AsyncProcess {
    */
   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] 
results) {
-    return submitAll(null, tableName, rows, callback, results, null, timeout);
+    return submitAll(null, tableName, rows, callback, results, null, 
operationTimeout, rpcTimeout);
   }
 
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, 
TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] 
results) {
-    return submitAll(pool, tableName, rows, callback, results, null, timeout);
+    return submitAll(pool, tableName, rows, callback, results, null, 
operationTimeout, rpcTimeout);
   }
   /**
    * Submit immediately the list of rows, whatever the server status. Kept for 
backward
@@ -580,7 +591,7 @@ class AsyncProcess {
    */
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, 
TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] 
results,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      PayloadCarryingServerCallable callable, int operationTimeout, int 
rpcTimeout) {
     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
 
     // The position will be used by the processBatch to match the object array 
returned.
@@ -600,7 +611,7 @@ class AsyncProcess {
     }
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, 
results, results != null,
-        callable, curTimeout);
+        callable, operationTimeout, rpcTimeout);
     ars.groupAndSendMultiAction(actions, 1);
     return ars;
   }
@@ -752,12 +763,12 @@ class AsyncProcess {
           if (callable == null) {
             callable = createCallable(server, tableName, multiAction);
           }
-          RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
+          RpcRetryingCaller<MultiResponse> caller = createCaller(callable, 
rpcTimeout);
           try {
             if (callsInProgress != null) {
               callsInProgress.add(callable);
             }
-            res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
+            res = caller.callWithoutRetries(callable, operationTimeout);
             if (res == null) {
               // Cancelled
               return;
@@ -823,11 +834,14 @@ class AsyncProcess {
     private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
     private PayloadCarryingServerCallable currentCallable;
-    private int currentCallTotalTimeout;
+    private int operationTimeout;
+    private int rpcTimeout;
+    private RetryingTimeTracker tracker;
 
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> 
actions, long nonceGroup,
         ExecutorService pool, boolean needResults, Object[] results,
-        Batch.Callback<CResult> callback, PayloadCarryingServerCallable 
callable, int timeout) {
+        Batch.Callback<CResult> callback, PayloadCarryingServerCallable 
callable,
+        int operationTimeout, int rpcTimeout) {
       this.pool = pool;
       this.callback = callback;
       this.nonceGroup = nonceGroup;
@@ -897,7 +911,11 @@ class AsyncProcess {
       this.errorsByServer = createServerErrorTracker();
       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
       this.currentCallable = callable;
-      this.currentCallTotalTimeout = timeout;
+      this.operationTimeout = operationTimeout;
+      this.rpcTimeout = rpcTimeout;
+      if (callable == null) {
+        this.tracker = new RetryingTimeTracker().start();
+      }
     }
 
     public Set<PayloadCarryingServerCallable> getCallsInProgress() {
@@ -1717,6 +1735,16 @@ class AsyncProcess {
       waitUntilDone();
       return results;
     }
+
+    /**
+     * Create a callable. Isolated to be easily overridden in the tests.
+     */
+    @VisibleForTesting
+    protected MultiServerCallable<Row> createCallable(final ServerName server,
+        TableName tableName, final MultiAction<Row> multi) {
+      return new MultiServerCallable<Row>(connection, tableName, server,
+          AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker);
+    }
   }
 
   private void updateStats(ServerName server, Map<byte[], 
MultiResponse.RegionResult> results) {
@@ -1738,10 +1766,10 @@ class AsyncProcess {
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
       TableName tableName, List<Action<Row>> actions, long nonceGroup, 
ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      PayloadCarryingServerCallable callable, int operationTimeout, int 
rpcTimeout) {
     return new AsyncRequestFutureImpl<CResult>(
         tableName, actions, nonceGroup, getPool(pool), needResults,
-        results, callback, callable, curTimeout);
+        results, callback, callable, operationTimeout, rpcTimeout);
   }
 
   @VisibleForTesting
@@ -1750,24 +1778,17 @@ class AsyncProcess {
       TableName tableName, List<Action<Row>> actions, long nonceGroup, 
ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults) 
{
     return createAsyncRequestFuture(
-        tableName, actions, nonceGroup, pool, callback, results, needResults, 
null, timeout);
-  }
-
-  /**
-   * Create a callable. Isolated to be easily overridden in the tests.
-   */
-  @VisibleForTesting
-  protected MultiServerCallable<Row> createCallable(final ServerName server,
-      TableName tableName, final MultiAction<Row> multi) {
-    return new MultiServerCallable<Row>(connection, tableName, server, 
this.rpcFactory, multi);
+        tableName, actions, nonceGroup, pool, callback, results, needResults, 
null,
+        operationTimeout, rpcTimeout);
   }
 
   /**
    * Create a caller. Isolated to be easily overridden in the tests.
    */
   @VisibleForTesting
-  protected RpcRetryingCaller<MultiResponse> 
createCaller(PayloadCarryingServerCallable callable) {
-    return rpcCallerFactory.<MultiResponse> newCaller();
+  protected RpcRetryingCaller<MultiResponse> 
createCaller(PayloadCarryingServerCallable callable,
+      int rpcTimeout) {
+    return rpcCallerFactory.<MultiResponse> newCaller(rpcTimeout);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 273f2e4..d722821 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -71,6 +72,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
   private final int maxKeyValueSize;
   private boolean closed = false;
   private final ExecutorService pool;
+  private int rpcTimeout;
+  private int operationTimeout;
 
   @VisibleForTesting
   protected AsyncProcess ap; // non-final so can be overridden in test
@@ -92,9 +95,13 @@ public class BufferedMutatorImpl implements BufferedMutator {
         params.getWriteBufferSize() : tableConf.getWriteBufferSize();
     this.maxKeyValueSize = params.getMaxKeyValueSize() != 
BufferedMutatorParams.UNSET ?
         params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
-
+    this.rpcTimeout = 
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+    this.operationTimeout = conn.getConfiguration().getInt(
+        HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+                 HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     // puts need to track errors globally due to how the APIs currently work.
-    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, 
rpcFactory);
+    ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, 
rpcFactory, rpcTimeout);
   }
 
   @Override
@@ -279,6 +286,16 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
     return this.writeBufferSize;
   }
 
+  public void setRpcTimeout(int rpcTimeout) {
+    this.rpcTimeout = rpcTimeout;
+    this.ap.setRpcTimeout(rpcTimeout);
+  }
+
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+    this.ap.setOperationTimeout(operationTimeout);
+  }
+
   /**
    * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. 
This should not beÓ
    * called from production uses.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index b055884..4e9d208 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -2333,7 +2333,8 @@ class ConnectionManager {
     // For tests to override.
     protected AsyncProcess createAsyncProcess(Configuration conf) {
       // No default pool available.
-      return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, 
rpcControllerFactory);
+      return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, 
rpcControllerFactory,
+          rpcTimeout);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index efa03c6..4c31099 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1035,13 +1035,12 @@ public class HTable implements HTableInterface, 
RegionLocator {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
+    final RetryingTimeTracker tracker = new RetryingTimeTracker().start();
     PayloadCarryingServerCallable<MultiResponse> callable =
       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
           rpcControllerFactory) {
         @Override
         public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
           controller.setPriority(tableName);
           int remainingTime = tracker.getRemainingTime(callTimeout);
           if (remainingTime == 0) {
@@ -1071,7 +1070,7 @@ public class HTable implements HTableInterface, 
RegionLocator {
         }
       };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, 
rm.getMutations(),
-        null, null, callable, operationTimeout);
+        null, null, callable, operationTimeout, rpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -1344,13 +1343,12 @@ public class HTable implements HTableInterface, 
RegionLocator {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final 
byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
     throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
+    final RetryingTimeTracker tracker = new RetryingTimeTracker().start();
     PayloadCarryingServerCallable<MultiResponse> callable =
       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
         rpcControllerFactory) {
         @Override
         public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
           controller.setPriority(tableName);
           int remainingTime = tracker.getRemainingTime(callTimeout);
           if (remainingTime == 0) {
@@ -1384,7 +1382,7 @@ public class HTable implements HTableInterface, 
RegionLocator {
      * */
     Object[] results = new Object[rm.getMutations().size()];
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, 
rm.getMutations(),
-      null, results, callable, operationTimeout);
+      null, results, callable, operationTimeout, rpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -1800,6 +1798,10 @@ public class HTable implements HTableInterface, 
RegionLocator {
 
   public void setOperationTimeout(int operationTimeout) {
     this.operationTimeout = operationTimeout;
+    if (mutator != null) {
+      mutator.setOperationTimeout(operationTimeout);
+    }
+    multiAp.setOperationTimeout(operationTimeout);
   }
 
   public int getOperationTimeout() {
@@ -1808,6 +1810,10 @@ public class HTable implements HTableInterface, 
RegionLocator {
 
   @Override public void setRpcTimeout(int rpcTimeout) {
     this.rpcTimeout = rpcTimeout;
+    if (mutator != null) {
+      mutator.setRpcTimeout(rpcTimeout);
+    }
+    multiAp.setRpcTimeout(rpcTimeout);
   }
 
   @Override public int getRpcTimeout() {
@@ -1891,7 +1897,7 @@ public class HTable implements HTableInterface, 
RegionLocator {
     AsyncProcess asyncProcess =
         new AsyncProcess(connection, configuration, pool,
             RpcRetryingCallerFactory.instantiate(configuration, 
connection.getStatisticsTracker()),
-            true, RpcControllerFactory.instantiate(configuration));
+            true, RpcControllerFactory.instantiate(configuration), rpcTimeout);
 
     AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
         new Callback<ClientProtos.CoprocessorServiceResult>() {
@@ -1941,6 +1947,8 @@ public class HTable implements HTableInterface, 
RegionLocator {
               .writeBufferSize(connConfiguration.getWriteBufferSize())
               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
       );
+      mutator.setRpcTimeout(rpcTimeout);
+      mutator.setOperationTimeout(operationTimeout);
     }
     return mutator;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index dfb0104..6863eab 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -450,7 +450,10 @@ public class HTableMultiplexer {
       this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
       RpcRetryingCallerFactory rpcCallerFactory = 
RpcRetryingCallerFactory.instantiate(conf);
       RpcControllerFactory rpcControllerFactory = 
RpcControllerFactory.instantiate(conf);
-      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, 
rpcControllerFactory);
+      int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+          HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+      this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, 
rpcControllerFactory,
+          rpcTimeout);
       this.executor = executor;
       this.maxRetryInQueue = 
conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index d0b4c81..115ba33 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -52,9 +52,12 @@ import com.google.protobuf.ServiceException;
 class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
+  private final RetryingTimeTracker tracker;
+  private final int rpcTimeout;
 
   MultiServerCallable(final ClusterConnection connection, final TableName 
tableName,
-      final ServerName location, RpcControllerFactory rpcFactory, final 
MultiAction<R> multi) {
+      final ServerName location, RpcControllerFactory rpcFactory, final 
MultiAction<R> multi,
+      int rpcTimeout, RetryingTimeTracker tracker) {
     super(connection, tableName, null, rpcFactory);
     this.multiAction = multi;
     // RegionServerCallable has HRegionLocation field, but this is a 
multi-region request.
@@ -62,6 +65,8 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
     // we will store the server here, and throw if someone tries to obtain 
location/regioninfo.
     this.location = new HRegionLocation(null, location);
     this.cellBlock = isCellBlock();
+    this.tracker = tracker;
+    this.rpcTimeout = rpcTimeout;
   }
 
   @Override
@@ -79,7 +84,13 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
   }
 
   @Override
-  public MultiResponse call(int callTimeout) throws IOException {
+  public MultiResponse call(int operationTimeout) throws IOException {
+    int remainingTime = tracker.getRemainingTime(operationTimeout);
+    if (remainingTime <= 1) {
+      // "1" is a special return value in RetryingTimeTracker, see its 
implementation.
+      throw new DoNotRetryIOException("Operation Timeout");
+    }
+    int callTimeout = Math.min(rpcTimeout, remainingTime);
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index 24288e6..406928a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -25,10 +25,11 @@ class RetryingTimeTracker {
 
   private long globalStartTime = -1;
 
-  public void start() {
+  public RetryingTimeTracker start() {
     if (this.globalStartTime < 0) {
       this.globalStartTime = EnvironmentEdgeManager.currentTime();
     }
+    return this;
   }
 
   public int getRemainingTime(int callTimeout) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 3a0c08d..d76a99f 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -155,14 +155,18 @@ public class TestAsyncProcess {
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, 
AtomicInteger nbThreads) {
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
           new SynchronousQueue<Runnable>(), new 
CountingThreadFactory(nbThreads)),
-            new RpcRetryingCallerFactory(conf), false, new 
RpcControllerFactory(conf));
+            new RpcRetryingCallerFactory(conf), false, new 
RpcControllerFactory(conf),
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     }
 
     public MyAsyncProcess(
         ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
       super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
         new SynchronousQueue<Runnable>(), new CountingThreadFactory(new 
AtomicInteger())),
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new 
RpcControllerFactory(conf));
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new 
RpcControllerFactory(conf),
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     }
 
     public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean 
useGlobalErrors,
@@ -174,7 +178,9 @@ public class TestAsyncProcess {
           throw new RejectedExecutionException("test under failure");
         }
       },
-          new RpcRetryingCallerFactory(conf), useGlobalErrors, new 
RpcControllerFactory(conf));
+          new RpcRetryingCallerFactory(conf), useGlobalErrors, new 
RpcControllerFactory(conf),
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     }
 
     @Override
@@ -187,7 +193,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable callable) {
+        PayloadCarryingServerCallable callable, int rpcTimeout) {
       callsCt.incrementAndGet();
       MultiServerCallable callable1 = (MultiServerCallable) callable;
       final MultiResponse mr = createMultiResponse(
@@ -250,7 +256,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-      PayloadCarryingServerCallable callable) {
+      PayloadCarryingServerCallable callable, int rpcTimeout) {
       callsCt.incrementAndGet();
       return new CallerWithFailure(ioe);
     }
@@ -287,7 +293,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable payloadCallable) {
+        PayloadCarryingServerCallable payloadCallable, int rpcTimeout) {
       MultiServerCallable<Row> callable = (MultiServerCallable) 
payloadCallable;
       final MultiResponse mr = createMultiResponse(
           callable.getMulti(), nbMultiResponse, nbActions, new 
ResponseGenerator() {
@@ -1118,7 +1124,9 @@ public class TestAsyncProcess {
     public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration 
conf,
         ExecutorService pool) {
       super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new 
RpcControllerFactory(
-          conf));
+          conf),
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 7b22ba4..dcac58f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -159,7 +160,9 @@ public class HConnectionTestingUtility {
     Mockito.when(c.getNonceGenerator()).thenReturn(ng);
     Mockito.when(c.getAsyncProcess()).thenReturn(
       new AsyncProcess(c, conf, null, 
RpcRetryingCallerFactory.instantiate(conf), false,
-          RpcControllerFactory.instantiate(conf)));
+          RpcControllerFactory.instantiate(conf),
+          conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+              HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
     Mockito.doNothing().when(c).incCount();
     Mockito.doNothing().when(c).decCount();
     Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(

http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 3307d42..c6482fe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -67,12 +67,14 @@ import 
org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.CallTimeoutException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -127,14 +129,14 @@ public class TestHCM {
 * This copro sleeps 20 second. The first call it fails. The second time, it 
works.
 */
   public static class SleepAndFailFirstTime extends BaseRegionObserver {
-    static final AtomicLong ct = new AtomicLong(0);
-    static final String SLEEP_TIME_CONF_KEY =
-        "hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
-    static final long DEFAULT_SLEEP_TIME = 20000;
-    static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
+  static final AtomicLong ct = new AtomicLong(0);
+  static final String SLEEP_TIME_CONF_KEY =
+      "hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
+  static final long DEFAULT_SLEEP_TIME = 20000;
+  static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
 
-    public SleepAndFailFirstTime() {
-    }
+  public SleepAndFailFirstTime() {
+  }
 
     @Override
     public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
@@ -145,12 +147,42 @@ public class TestHCM {
 
     @Override
     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
-              final Get get, final List<Cell> results) throws IOException {
+        final Get get, final List<Cell> results) throws IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+    }
+
+    @Override
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Put put, final WALEdit edit, final Durability durability) throws 
IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+    }
+
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> 
e,
+        final Delete delete,
+        final WALEdit edit, final Durability durability) throws IOException {
       Threads.sleep(sleepTime.get());
-      if (ct.incrementAndGet() == 1){
+      if (ct.incrementAndGet() == 1) {
         throw new IOException("first call I fail");
       }
     }
+
+    @Override
+    public Result preIncrement(final 
ObserverContext<RegionCoprocessorEnvironment> e,
+        final Increment increment) throws IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+      return super.preIncrement(e, increment);
+    }
+
   }
 
   public static class SleepCoprocessor extends BaseRegionObserver {
@@ -160,16 +192,26 @@ public class TestHCM {
         final Get get, final List<Cell> results) throws IOException {
       Threads.sleep(SLEEP_TIME);
     }
-  }
 
-  public static class SleepWriteCoprocessor extends BaseRegionObserver {
-    public static final int SLEEP_TIME = 5000;
     @Override
     public Result preIncrement(final 
ObserverContext<RegionCoprocessorEnvironment> e,
-                               final Increment increment) throws IOException {
+        final Increment increment) throws IOException {
       Threads.sleep(SLEEP_TIME);
       return super.preIncrement(e, increment);
     }
+
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> 
e, final Delete delete,
+        final WALEdit edit, final Durability durability) throws IOException {
+      Threads.sleep(SLEEP_TIME);
+    }
+
+    @Override
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Put put, final WALEdit edit, final Durability durability) throws 
IOException {
+      Threads.sleep(SLEEP_TIME);
+    }
+
   }
 
   public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver 
{
@@ -358,11 +400,12 @@ public class TestHCM {
    * timeouted when the server answers.
    */
   @Test
-  public void testOperationTimeout() throws Exception {
-    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
+  public void testGetOperationTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout");
     hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
-    HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, 
TEST_UTIL.getConfiguration());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, 
TEST_UTIL.getConfiguration());
     table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
     // Check that it works if the timeout is big enough
     table.setOperationTimeout(120 * 1000);
     table.get(new Get(FAM_NAM));
@@ -385,6 +428,99 @@ public class TestHCM {
     }
   }
 
+  @Test
+  public void testPutOperationTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout");
+    hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM 
},TEST_UTIL.getConfiguration());
+    table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
+    // Check that it works if the timeout is big enough
+    table.setOperationTimeout(120 * 1000);
+    table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+
+    // Resetting and retrying. Will fail this time, not enough time for the 
second try
+    SleepAndFailFirstTime.ct.set(0);
+    try {
+      table.setOperationTimeout(30 * 1000);
+      table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+      Assert.fail("We expect an exception here");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      // The client has a CallTimeout class, but it's not shared.We're not 
very clean today,
+      //  in the general case you can expect the call to stop, but the 
exception may vary.
+      // In this test however, we're sure that it will be a socket timeout.
+      LOG.info("We received an exception, as expected ", e);
+    } catch (IOException e) {
+      Assert.fail("Wrong exception:" + e.getMessage());
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
+  public void testDeleteOperationTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout");
+    hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM 
},TEST_UTIL.getConfiguration());
+    table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
+    // Check that it works if the timeout is big enough
+    table.setOperationTimeout(120 * 1000);
+    table.delete(new Delete(FAM_NAM));
+
+    // Resetting and retrying. Will fail this time, not enough time for the 
second try
+    SleepAndFailFirstTime.ct.set(0);
+    try {
+      table.setOperationTimeout(30 * 1000);
+      table.delete(new Delete(FAM_NAM));
+      Assert.fail("We expect an exception here");
+    } catch (IOException e) {
+      // The client has a CallTimeout class, but it's not shared.We're not 
very clean today,
+      //  in the general case you can expect the call to stop, but the 
exception may vary.
+      // In this test however, we're sure that it will be a socket timeout.
+      LOG.info("We received an exception, as expected ", e);
+    } finally {
+      table.close();
+    }
+  }
+  @Test
+  public void testDeleteRpcTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      Delete d = new Delete(FAM_NAM);
+      d.addColumn(FAM_NAM, FAM_NAM, 1);
+      t.delete(d);
+      fail("Write should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+  }
+
+  @Test
+  public void testPutRpcTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      Put p = new Put(FAM_NAM);
+      p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
+      t.put(p);
+      fail("Write should not have succeeded");
+    } catch (IOException e) {
+      // expected
+    }
+
+  }
+
   @Test(expected = RetriesExhaustedException.class)
   public void testRpcTimeout() throws Exception {
     HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
@@ -426,6 +562,7 @@ public class TestHCM {
     TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
 
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    SleepAndFailFirstTime.ct.set(0);
     c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
     c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
 
@@ -932,8 +1069,7 @@ public class TestHCM {
       curServer.getServerName().getPort(),
       conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
 
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
RPC_RETRY);
     table.close();
   }
 

Reply via email to