Repository: hbase
Updated Branches:
  refs/heads/0.98 267ad3b99 -> 9ce175146


HBASE-11347 For some errors, the client can retry infinitely


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9ce17514
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9ce17514
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9ce17514

Branch: refs/heads/0.98
Commit: 9ce175146f4dd8ad984eac69819d8cd375368e61
Parents: 267ad3b
Author: Nicolas Liochon <[email protected]>
Authored: Sat Jun 14 09:16:03 2014 +0200
Committer: Nicolas Liochon <[email protected]>
Committed: Sat Jun 14 09:16:03 2014 +0200

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       |  3 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 82 ++++++++++++++++----
 2 files changed, 70 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9ce17514/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index bbb2fdf..0c9ce20 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -646,11 +646,12 @@ class AsyncProcess<CResult> {
     hConnection.updateCachedLocations(tableName,
       
rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, 
location);
     errorsByServer.reportServerError(location);
+    boolean canRetry = errorsByServer.canRetryMore(numAttempt);
 
     List<Action<Row>> toReplay = new 
ArrayList<Action<Row>>(initialActions.size());
     for (Map.Entry<byte[], List<Action<Row>>> e : 
rsActions.actions.entrySet()) {
       for (Action<Row> action : e.getValue()) {
-        if (manageError(action.getOriginalIndex(), action.getAction(), true, 
t, location)) {
+        if (manageError(action.getOriginalIndex(), action.getAction(), 
canRetry, t, location)) {
           toReplay.add(action);
         }
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9ce17514/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 4c93f44..a9402cc 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -74,24 +74,24 @@ public class TestAsyncProcess {
   private static final String success = "success";
   private static Exception failure = new Exception("failure");
 
+  static class CountingThreadFactory implements ThreadFactory {
+    final AtomicInteger nbThreads;
+    ThreadFactory realFactory =  
Threads.newDaemonThreadFactory("test-TestAsyncProcess");
+    @Override
+    public Thread newThread(Runnable r) {
+      nbThreads.incrementAndGet();
+      return realFactory.newThread(r);
+    }
+
+    CountingThreadFactory(AtomicInteger nbThreads){
+      this.nbThreads = nbThreads;
+    }
+  }
+
   static class MyAsyncProcess<Res> extends AsyncProcess<Res> {
     final AtomicInteger nbMultiResponse = new AtomicInteger();
     final AtomicInteger nbActions = new AtomicInteger();
 
-    static class CountingThreadFactory implements ThreadFactory {
-      final AtomicInteger nbThreads;
-      ThreadFactory realFactory =  
Threads.newDaemonThreadFactory("test-TestAsyncProcess");
-      @Override
-      public Thread newThread(Runnable r) {
-        nbThreads.incrementAndGet();
-        return realFactory.newThread(r);
-      }
-
-      CountingThreadFactory(AtomicInteger nbThreads){
-        this.nbThreads = nbThreads;
-      }
-    }
-
     public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, 
Configuration conf) {
       this(hc, callback, conf, new AtomicInteger());
     }
@@ -124,6 +124,33 @@ public class TestAsyncProcess {
     }
   }
 
+  static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{
+
+    public CallerWithFailure() {
+      super(100, 100);
+    }
+
+    @Override
+    public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> 
callable)
+      throws IOException, RuntimeException {
+      throw new IOException("test");
+    }
+  }
+
+  static class AsyncProcessWithFailure<Res> extends MyAsyncProcess<Res> {
+
+    public AsyncProcessWithFailure(HConnection hc, Configuration conf) {
+      super(hc, null, conf, new AtomicInteger());
+      serverTrackerTimeout = 1;
+    }
+
+    @Override
+    protected RpcRetryingCaller<MultiResponse> 
createCaller(MultiServerCallable<Row> callable) {
+      return new CallerWithFailure();
+    }
+  }
+
+
   static MultiResponse createMultiResponse(final HRegionLocation loc,
       final MultiAction<Row> multi, AtomicInteger nbMultiResponse, 
AtomicInteger nbActions) {
     final MultiResponse mr = new MultiResponse();
@@ -707,6 +734,33 @@ public class TestAsyncProcess {
     Assert.assertEquals(ht.ap.tasksSent.get(), 3);
   }
 
+  @Test
+  public void testGlobalErrors() throws IOException {
+    HTable ht = new HTable();
+    Configuration configuration = new Configuration(conf);
+    configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true);
+    configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    ht.connection = new MyConnectionImpl(configuration);
+    AsyncProcessWithFailure<Object> ap =
+      new AsyncProcessWithFailure<Object>(ht.connection, configuration);
+    ht.ap = ap;
+
+    Assert.assertNotNull(ht.ap.createServerErrorTracker());
+
+    Put p = createPut(1, true);
+    ht.setAutoFlush(false, false);
+    ht.put(p);
+
+    try {
+      ht.flushCommits();
+      Assert.fail();
+    } catch (RetriesExhaustedWithDetailsException expected) {
+    }
+    // Checking that the ErrorsServers came into play and didn't make us stop 
immediately
+    Assert.assertEquals(3, ht.ap.tasksSent.get());
+  }
+
+
   /**
    * This test simulates multiple regions on 2 servers. We should have 2 multi 
requests and
    *  2 threads: 1 per server, this whatever the number of regions.

Reply via email to