Repository: hbase
Updated Branches:
  refs/heads/branch-1 e2278f954 -> 8f9fadf02


HBASE-16664 Timeout logic in AsyncProcess is broken

Signed-off-by: chenheng <chenh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8f9fadf0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8f9fadf0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8f9fadf0

Branch: refs/heads/branch-1
Commit: 8f9fadf0216977996564ec56347a91e5a0a8b945
Parents: e2278f9
Author: Phil Yang <ud1...@gmail.com>
Authored: Sun Oct 9 19:31:45 2016 +0800
Committer: chenheng <chenh...@apache.org>
Committed: Thu Oct 13 17:14:52 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  73 +++++---
 .../hbase/client/BufferedMutatorImpl.java       |  15 +-
 .../org/apache/hadoop/hbase/client/HTable.java  |  29 ++-
 .../hbase/client/MultiServerCallable.java       |  15 +-
 .../hbase/client/RetryingTimeTracker.java       |   3 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  13 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java | 182 ++++++++++++++++---
 7 files changed, 259 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9fadf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 647a466..b0652a7 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -259,7 +259,8 @@ class AsyncProcess {
   protected final long pause;
   protected int numTries;
   protected int serverTrackerTimeout;
-  protected int timeout;
+  protected int rpcTimeout;
+  protected int operationTimeout;
   protected long primaryCallTimeoutMicroseconds;
   /** Whether to log details for batch errors */
   private final boolean logBatchErrorDetails;
@@ -322,7 +323,9 @@ class AsyncProcess {
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-    this.timeout = rpcTimeout;
+    this.rpcTimeout = rpcTimeout;
+    this.operationTimeout = 
conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     this.primaryCallTimeoutMicroseconds = 
conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
 
     this.maxTotalConcurrentTasks = 
conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
@@ -378,6 +381,14 @@ class AsyncProcess {
           DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
   }
 
+  public void setRpcTimeout(int rpcTimeout) {
+    this.rpcTimeout = rpcTimeout;
+  }
+
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
   /**
    * @return pool if non null, otherwise returns this.pool if non null, 
otherwise throws
    *         RuntimeException
@@ -570,12 +581,12 @@ class AsyncProcess {
    */
   public <CResult> AsyncRequestFuture submitAll(TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] 
results) {
-    return submitAll(null, tableName, rows, callback, results, null, timeout);
+    return submitAll(null, tableName, rows, callback, results, null, 
operationTimeout, rpcTimeout);
   }
 
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, 
TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] 
results) {
-    return submitAll(pool, tableName, rows, callback, results, null, timeout);
+    return submitAll(pool, tableName, rows, callback, results, null, 
operationTimeout, rpcTimeout);
   }
   /**
    * Submit immediately the list of rows, whatever the server status. Kept for 
backward
@@ -589,7 +600,7 @@ class AsyncProcess {
    */
   public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, 
TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] 
results,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      PayloadCarryingServerCallable callable, int operationTimeout, int 
rpcTimeout) {
     List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
 
     // The position will be used by the processBatch to match the object array 
returned.
@@ -609,7 +620,7 @@ class AsyncProcess {
     }
     AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
         tableName, actions, ng.getNonceGroup(), getPool(pool), callback, 
results, results != null,
-        callable, curTimeout);
+        callable, operationTimeout, rpcTimeout);
     ars.groupAndSendMultiAction(actions, 1);
     return ars;
   }
@@ -779,12 +790,12 @@ class AsyncProcess {
           if (callable == null) {
             callable = createCallable(server, tableName, multiAction);
           }
-          RpcRetryingCaller<MultiResponse> caller = createCaller(callable);
+          RpcRetryingCaller<MultiResponse> caller = createCaller(callable, 
rpcTimeout);
           try {
             if (callsInProgress != null) {
               callsInProgress.add(callable);
             }
-            res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
+            res = caller.callWithoutRetries(callable, operationTimeout);
             if (res == null) {
               // Cancelled
               return;
@@ -850,11 +861,15 @@ class AsyncProcess {
     private final boolean hasAnyReplicaGets;
     private final long nonceGroup;
     private PayloadCarryingServerCallable currentCallable;
-    private int currentCallTotalTimeout;
+    private int operationTimeout;
+    private int rpcTimeout;
     private final Map<ServerName, List<Long>> heapSizesByServer = new 
HashMap<>();
+    private RetryingTimeTracker tracker;
+
     public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> 
actions, long nonceGroup,
         ExecutorService pool, boolean needResults, Object[] results,
-        Batch.Callback<CResult> callback, PayloadCarryingServerCallable 
callable, int timeout) {
+        Batch.Callback<CResult> callback, PayloadCarryingServerCallable 
callable,
+        int operationTimeout, int rpcTimeout) {
       this.pool = pool;
       this.callback = callback;
       this.nonceGroup = nonceGroup;
@@ -924,7 +939,12 @@ class AsyncProcess {
       this.errorsByServer = createServerErrorTracker();
       this.errors = (globalErrors != null) ? globalErrors : new BatchErrors();
       this.currentCallable = callable;
-      this.currentCallTotalTimeout = timeout;
+      this.operationTimeout = operationTimeout;
+      this.rpcTimeout = rpcTimeout;
+      if (callable == null) {
+        tracker = new RetryingTimeTracker();
+        tracker.start();
+      }
     }
 
     public Set<PayloadCarryingServerCallable> getCallsInProgress() {
@@ -1759,6 +1779,16 @@ class AsyncProcess {
       waitUntilDone();
       return results;
     }
+
+    /**
+     * Create a callable. Isolated to be easily overridden in the tests.
+     */
+    @VisibleForTesting
+    protected MultiServerCallable<Row> createCallable(final ServerName server,
+        TableName tableName, final MultiAction<Row> multi) {
+      return new MultiServerCallable<Row>(connection, tableName, server,
+          AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker);
+    }
   }
 
   @VisibleForTesting
@@ -1781,10 +1811,10 @@ class AsyncProcess {
   protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
       TableName tableName, List<Action<Row>> actions, long nonceGroup, 
ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults,
-      PayloadCarryingServerCallable callable, int curTimeout) {
+      PayloadCarryingServerCallable callable, int operationTimeout, int 
rpcTimeout) {
     return new AsyncRequestFutureImpl<CResult>(
         tableName, actions, nonceGroup, getPool(pool), needResults,
-        results, callback, callable, curTimeout);
+        results, callback, callable, operationTimeout, rpcTimeout);
   }
 
   @VisibleForTesting
@@ -1793,24 +1823,17 @@ class AsyncProcess {
       TableName tableName, List<Action<Row>> actions, long nonceGroup, 
ExecutorService pool,
       Batch.Callback<CResult> callback, Object[] results, boolean needResults) 
{
     return createAsyncRequestFuture(
-        tableName, actions, nonceGroup, pool, callback, results, needResults, 
null, timeout);
-  }
-
-  /**
-   * Create a callable. Isolated to be easily overridden in the tests.
-   */
-  @VisibleForTesting
-  protected MultiServerCallable<Row> createCallable(final ServerName server,
-      TableName tableName, final MultiAction<Row> multi) {
-    return new MultiServerCallable<Row>(connection, tableName, server, 
this.rpcFactory, multi);
+        tableName, actions, nonceGroup, pool, callback, results, needResults, 
null,
+        operationTimeout, rpcTimeout);
   }
 
   /**
    * Create a caller. Isolated to be easily overridden in the tests.
    */
   @VisibleForTesting
-  protected RpcRetryingCaller<MultiResponse> 
createCaller(PayloadCarryingServerCallable callable) {
-    return rpcCallerFactory.<MultiResponse> newCaller();
+  protected RpcRetryingCaller<MultiResponse> 
createCaller(PayloadCarryingServerCallable callable,
+      int rpcTimeout) {
+    return rpcCallerFactory.<MultiResponse> newCaller(rpcTimeout);
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9fadf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index e12b34d..1974be3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -81,6 +81,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
   private boolean closed = false;
   private final ExecutorService pool;
   private int writeRpcTimeout; // needed to pass in through AsyncProcess 
constructor
+  private int operationTimeout;
 
   @VisibleForTesting
   protected AsyncProcess ap; // non-final so can be overridden in test
@@ -106,7 +107,9 @@ public class BufferedMutatorImpl implements BufferedMutator 
{
     this.writeRpcTimeout = 
conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
         conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
             HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-
+    this.operationTimeout = conn.getConfiguration().getInt(
+        HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
     // puts need to track errors globally due to how the APIs currently work.
     ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, 
rpcFactory, writeRpcTimeout);
   }
@@ -281,6 +284,16 @@ public class BufferedMutatorImpl implements 
BufferedMutator {
     return this.writeBufferSize;
   }
 
+  public void setRpcTimeout(int writeRpcTimeout) {
+    this.writeRpcTimeout = writeRpcTimeout;
+    this.ap.setRpcTimeout(writeRpcTimeout);
+  }
+
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+    this.ap.setOperationTimeout(operationTimeout);
+  }
+
   /**
    * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. 
This should not beÓ
    * called from production uses.

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9fadf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 5fc2d65..e8a969f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -911,9 +911,10 @@ public class HTable implements HTableInterface, 
RegionLocator {
     }
   }
 
-  public void batch(final List<? extends Row> actions, final Object[] results, 
int timeout)
+  public void batch(final List<? extends Row> actions, final Object[] results, 
int rpcTimeout)
       throws InterruptedException, IOException {
-    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, 
results, null, timeout);
+    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, 
results, null,
+        operationTimeout, rpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -1055,13 +1056,12 @@ public class HTable implements HTableInterface, 
RegionLocator {
    */
   @Override
   public void mutateRow(final RowMutations rm) throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
+    final RetryingTimeTracker tracker = new RetryingTimeTracker().start();
     PayloadCarryingServerCallable<MultiResponse> callable =
       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
           rpcControllerFactory) {
         @Override
         public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
           controller.setPriority(tableName);
           int remainingTime = tracker.getRemainingTime(callTimeout);
           if (remainingTime == 0) {
@@ -1091,7 +1091,7 @@ public class HTable implements HTableInterface, 
RegionLocator {
         }
       };
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, 
rm.getMutations(),
-        null, null, callable, operationTimeout);
+        null, null, callable, operationTimeout, writeRpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -1364,13 +1364,12 @@ public class HTable implements HTableInterface, 
RegionLocator {
   public boolean checkAndMutate(final byte [] row, final byte [] family, final 
byte [] qualifier,
     final CompareOp compareOp, final byte [] value, final RowMutations rm)
     throws IOException {
-    final RetryingTimeTracker tracker = new RetryingTimeTracker();
+    final RetryingTimeTracker tracker = new RetryingTimeTracker().start();
     PayloadCarryingServerCallable<MultiResponse> callable =
       new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), 
rm.getRow(),
         rpcControllerFactory) {
         @Override
         public MultiResponse call(int callTimeout) throws IOException {
-          tracker.start();
           controller.setPriority(tableName);
           int remainingTime = tracker.getRemainingTime(callTimeout);
           if (remainingTime == 0) {
@@ -1404,7 +1403,7 @@ public class HTable implements HTableInterface, 
RegionLocator {
      * */
     Object[] results = new Object[rm.getMutations().size()];
     AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, 
rm.getMutations(),
-      null, results, callable, operationTimeout);
+      null, results, callable, operationTimeout, writeRpcTimeout);
     ars.waitUntilDone();
     if (ars.hasError()) {
       throw ars.getErrors();
@@ -1809,6 +1808,10 @@ public class HTable implements HTableInterface, 
RegionLocator {
 
   public void setOperationTimeout(int operationTimeout) {
     this.operationTimeout = operationTimeout;
+    if (mutator != null) {
+      mutator.setOperationTimeout(operationTimeout);
+    }
+    multiAp.setOperationTimeout(operationTimeout);
   }
 
   public int getOperationTimeout() {
@@ -1824,8 +1827,8 @@ public class HTable implements HTableInterface, 
RegionLocator {
   @Override
   @Deprecated
   public void setRpcTimeout(int rpcTimeout) {
-    this.readRpcTimeout = rpcTimeout;
-    this.writeRpcTimeout = rpcTimeout;
+    setWriteRpcTimeout(rpcTimeout);
+    setReadRpcTimeout(rpcTimeout);
   }
 
   @Override
@@ -1836,6 +1839,10 @@ public class HTable implements HTableInterface, 
RegionLocator {
   @Override
   public void setWriteRpcTimeout(int writeRpcTimeout) {
     this.writeRpcTimeout = writeRpcTimeout;
+    if (mutator != null) {
+      mutator.setRpcTimeout(writeRpcTimeout);
+    }
+    multiAp.setRpcTimeout(writeRpcTimeout);
   }
 
   @Override
@@ -1973,6 +1980,8 @@ public class HTable implements HTableInterface, 
RegionLocator {
               .writeBufferSize(connConfiguration.getWriteBufferSize())
               .maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
       );
+      mutator.setRpcTimeout(writeRpcTimeout);
+      mutator.setOperationTimeout(operationTimeout);
     }
     return mutator;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9fadf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index d0b4c81..115ba33 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -52,9 +52,12 @@ import com.google.protobuf.ServiceException;
 class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse> {
   private final MultiAction<R> multiAction;
   private final boolean cellBlock;
+  private final RetryingTimeTracker tracker;
+  private final int rpcTimeout;
 
   MultiServerCallable(final ClusterConnection connection, final TableName 
tableName,
-      final ServerName location, RpcControllerFactory rpcFactory, final 
MultiAction<R> multi) {
+      final ServerName location, RpcControllerFactory rpcFactory, final 
MultiAction<R> multi,
+      int rpcTimeout, RetryingTimeTracker tracker) {
     super(connection, tableName, null, rpcFactory);
     this.multiAction = multi;
     // RegionServerCallable has HRegionLocation field, but this is a 
multi-region request.
@@ -62,6 +65,8 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
     // we will store the server here, and throw if someone tries to obtain 
location/regioninfo.
     this.location = new HRegionLocation(null, location);
     this.cellBlock = isCellBlock();
+    this.tracker = tracker;
+    this.rpcTimeout = rpcTimeout;
   }
 
   @Override
@@ -79,7 +84,13 @@ class MultiServerCallable<R> extends 
PayloadCarryingServerCallable<MultiResponse
   }
 
   @Override
-  public MultiResponse call(int callTimeout) throws IOException {
+  public MultiResponse call(int operationTimeout) throws IOException {
+    int remainingTime = tracker.getRemainingTime(operationTimeout);
+    if (remainingTime <= 1) {
+      // "1" is a special return value in RetryingTimeTracker, see its 
implementation.
+      throw new DoNotRetryIOException("Operation Timeout");
+    }
+    int callTimeout = Math.min(rpcTimeout, remainingTime);
     int countOfActions = this.multiAction.size();
     if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions");
     MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9fadf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index 24288e6..406928a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -25,10 +25,11 @@ class RetryingTimeTracker {
 
   private long globalStartTime = -1;
 
-  public void start() {
+  public RetryingTimeTracker start() {
     if (this.globalStartTime < 0) {
       this.globalStartTime = EnvironmentEdgeManager.currentTime();
     }
+    return this;
   }
 
   public int getRemainingTime(int callTimeout) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9fadf0/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 9a566e8..3b7f395 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -211,9 +211,10 @@ public class TestAsyncProcess {
     @Override
     public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, 
TableName tableName,
       List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] 
results,
-      PayloadCarryingServerCallable callable, int curTimeout) {
-      previousTimeout = curTimeout;
-      return super.submitAll(pool, tableName, rows, callback, results, 
callable, curTimeout);
+      PayloadCarryingServerCallable callable, int operationTimeout, int 
rpcTimeout) {
+      previousTimeout = rpcTimeout;
+      return super.submitAll(pool, tableName, rows, callback, results, 
callable, operationTimeout,
+          rpcTimeout);
     }
 
     @Override
@@ -222,7 +223,7 @@ public class TestAsyncProcess {
     }
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable callable) {
+        PayloadCarryingServerCallable callable, int rpcTimeout) {
       callsCt.incrementAndGet();
       MultiServerCallable callable1 = (MultiServerCallable) callable;
       final MultiResponse mr = createMultiResponse(
@@ -285,7 +286,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-      PayloadCarryingServerCallable callable) {
+      PayloadCarryingServerCallable callable, int rpcTimeout) {
       callsCt.incrementAndGet();
       return new CallerWithFailure(ioe);
     }
@@ -336,7 +337,7 @@ public class TestAsyncProcess {
 
     @Override
     protected RpcRetryingCaller<MultiResponse> createCaller(
-        PayloadCarryingServerCallable payloadCallable) {
+        PayloadCarryingServerCallable payloadCallable, int rpcTimeout) {
       MultiServerCallable<Row> callable = (MultiServerCallable) 
payloadCallable;
       final MultiResponse mr = createMultiResponse(
           callable.getMulti(), nbMultiResponse, nbActions, new 
ResponseGenerator() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/8f9fadf0/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 8436563..f468c16 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -61,6 +61,7 @@ import 
org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.exceptions.RegionMovedException;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
+import org.apache.hadoop.hbase.ipc.CallTimeoutException;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
@@ -123,14 +124,14 @@ public class TestHCM {
 * This copro sleeps 20 second. The first call it fails. The second time, it 
works.
 */
   public static class SleepAndFailFirstTime extends BaseRegionObserver {
-    static final AtomicLong ct = new AtomicLong(0);
-    static final String SLEEP_TIME_CONF_KEY =
-        "hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
-    static final long DEFAULT_SLEEP_TIME = 20000;
-    static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
+  static final AtomicLong ct = new AtomicLong(0);
+  static final String SLEEP_TIME_CONF_KEY =
+      "hbase.coprocessor.SleepAndFailFirstTime.sleepTime";
+  static final long DEFAULT_SLEEP_TIME = 20000;
+  static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME);
 
-    public SleepAndFailFirstTime() {
-    }
+  public SleepAndFailFirstTime() {
+  }
 
     @Override
     public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
@@ -141,12 +142,42 @@ public class TestHCM {
 
     @Override
     public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
-              final Get get, final List<Cell> results) throws IOException {
+        final Get get, final List<Cell> results) throws IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+    }
+
+    @Override
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Put put, final WALEdit edit, final Durability durability) throws 
IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+    }
+
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> 
e,
+        final Delete delete,
+        final WALEdit edit, final Durability durability) throws IOException {
+      Threads.sleep(sleepTime.get());
+      if (ct.incrementAndGet() == 1) {
+        throw new IOException("first call I fail");
+      }
+    }
+
+    @Override
+    public Result preIncrement(final 
ObserverContext<RegionCoprocessorEnvironment> e,
+        final Increment increment) throws IOException {
       Threads.sleep(sleepTime.get());
-      if (ct.incrementAndGet() == 1){
+      if (ct.incrementAndGet() == 1) {
         throw new IOException("first call I fail");
       }
+      return super.preIncrement(e, increment);
     }
+
   }
 
   public static class SleepCoprocessor extends BaseRegionObserver {
@@ -162,16 +193,20 @@ public class TestHCM {
         final Put put, final WALEdit edit, final Durability durability) throws 
IOException {
       Threads.sleep(SLEEP_TIME);
     }
-  }
 
-  public static class SleepWriteCoprocessor extends BaseRegionObserver {
-    public static final int SLEEP_TIME = 5000;
     @Override
     public Result preIncrement(final 
ObserverContext<RegionCoprocessorEnvironment> e,
-                               final Increment increment) throws IOException {
+        final Increment increment) throws IOException {
       Threads.sleep(SLEEP_TIME);
       return super.preIncrement(e, increment);
     }
+
+    @Override
+    public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> 
e, final Delete delete,
+        final WALEdit edit, final Durability durability) throws IOException {
+      Threads.sleep(SLEEP_TIME);
+    }
+
   }
 
   public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver 
{
@@ -365,11 +400,12 @@ public class TestHCM {
    * timeouted when the server answers.
    */
   @Test
-  public void testOperationTimeout() throws Exception {
-    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
+  public void testGetOperationTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout");
     hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
-    HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, 
TEST_UTIL.getConfiguration());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, 
TEST_UTIL.getConfiguration());
     table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
     // Check that it works if the timeout is big enough
     table.setOperationTimeout(120 * 1000);
     table.get(new Get(FAM_NAM));
@@ -393,6 +429,62 @@ public class TestHCM {
   }
 
   @Test
+  public void testPutOperationTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout");
+    hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM 
},TEST_UTIL.getConfiguration());
+    table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
+    // Check that it works if the timeout is big enough
+    table.setOperationTimeout(120 * 1000);
+    table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+
+    // Resetting and retrying. Will fail this time, not enough time for the 
second try
+    SleepAndFailFirstTime.ct.set(0);
+    try {
+      table.setOperationTimeout(30 * 1000);
+      table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+      Assert.fail("We expect an exception here");
+    } catch (RetriesExhaustedWithDetailsException e) {
+      // The client has a CallTimeout class, but it's not shared.We're not 
very clean today,
+      //  in the general case you can expect the call to stop, but the 
exception may vary.
+      // In this test however, we're sure that it will be a socket timeout.
+      LOG.info("We received an exception, as expected ", e);
+    } catch (IOException e) {
+      Assert.fail("Wrong exception:" + e.getMessage());
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
+  public void testDeleteOperationTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout");
+    hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+    Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM 
},TEST_UTIL.getConfiguration());
+    table.setRpcTimeout(Integer.MAX_VALUE);
+    SleepAndFailFirstTime.ct.set(0);
+    // Check that it works if the timeout is big enough
+    table.setOperationTimeout(120 * 1000);
+    table.delete(new Delete(FAM_NAM));
+
+    // Resetting and retrying. Will fail this time, not enough time for the 
second try
+    SleepAndFailFirstTime.ct.set(0);
+    try {
+      table.setOperationTimeout(30 * 1000);
+      table.delete(new Delete(FAM_NAM));
+      Assert.fail("We expect an exception here");
+    } catch (IOException e) {
+      // The client has a CallTimeout class, but it's not shared.We're not 
very clean today,
+      //  in the general case you can expect the call to stop, but the 
exception may vary.
+      // In this test however, we're sure that it will be a socket timeout.
+      LOG.info("We received an exception, as expected ", e);
+    } finally {
+      table.close();
+    }
+  }
+
+  @Test
   public void testRpcTimeout() throws Exception {
     HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
     hdt.addCoprocessor(SleepCoprocessor.class.getName());
@@ -420,14 +512,14 @@ public class TestHCM {
   }
 
   @Test
-  public void testWriteRpcTimeout() throws Exception {
-    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
-    hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
+  public void testIncrementRpcTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
 
     try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
-      t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
-      t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
+      t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
       Increment i = new Increment(FAM_NAM);
       i.addColumn(FAM_NAM, FAM_NAM, 1);
       t.increment(i);
@@ -437,7 +529,7 @@ public class TestHCM {
     }
 
     // Again, with configuration based override
-    c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 
SleepWriteCoprocessor.SLEEP_TIME / 2);
+    c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, 
SleepCoprocessor.SLEEP_TIME / 2);
     try (Connection conn = ConnectionFactory.createConnection(c)) {
       try (Table t = conn.getTable(hdt.getTableName())) {
         Increment i = new Increment(FAM_NAM);
@@ -451,8 +543,46 @@ public class TestHCM {
   }
 
   @Test
-  public void testReadRpcTimeout() throws Exception {
-    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
+  public void testDeleteRpcTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      Delete d = new Delete(FAM_NAM);
+      d.addColumn(FAM_NAM, FAM_NAM, 1);
+      t.delete(d);
+      fail("Write should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+  }
+
+  @Test
+  public void testPutRpcTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+      t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+      Put p = new Put(FAM_NAM);
+      p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
+      t.put(p);
+      fail("Write should not have succeeded");
+    } catch (RetriesExhaustedException e) {
+      // expected
+    }
+
+  }
+
+  @Test
+  public void testGetRpcTimeout() throws Exception {
+    HTableDescriptor hdt = 
TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout");
     hdt.addCoprocessor(SleepCoprocessor.class.getName());
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
 
@@ -503,6 +633,7 @@ public class TestHCM {
     TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
 
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    SleepAndFailFirstTime.ct.set(0);
     c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
     c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
 
@@ -1009,8 +1140,7 @@ public class TestHCM {
       curServer.getServerName().getPort(),
       conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
 
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 
RPC_RETRY);
     table.close();
   }
 

Reply via email to