Repository: hbase Updated Branches: refs/heads/0.98 267ad3b99 -> 9ce175146
HBASE-11347 For some errors, the client can retry infinitely Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9ce17514 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9ce17514 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9ce17514 Branch: refs/heads/0.98 Commit: 9ce175146f4dd8ad984eac69819d8cd375368e61 Parents: 267ad3b Author: Nicolas Liochon <[email protected]> Authored: Sat Jun 14 09:16:03 2014 +0200 Committer: Nicolas Liochon <[email protected]> Committed: Sat Jun 14 09:16:03 2014 +0200 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 3 +- .../hadoop/hbase/client/TestAsyncProcess.java | 82 ++++++++++++++++---- 2 files changed, 70 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/9ce17514/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index bbb2fdf..0c9ce20 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -646,11 +646,12 @@ class AsyncProcess<CResult> { hConnection.updateCachedLocations(tableName, rsActions.actions.values().iterator().next().get(0).getAction().getRow(), null, location); errorsByServer.reportServerError(location); + boolean canRetry = errorsByServer.canRetryMore(numAttempt); List<Action<Row>> toReplay = new ArrayList<Action<Row>>(initialActions.size()); for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) { for (Action<Row> action : e.getValue()) { - if (manageError(action.getOriginalIndex(), action.getAction(), true, t, location)) { + if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, location)) { toReplay.add(action); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/9ce17514/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 4c93f44..a9402cc 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -74,24 +74,24 @@ public class TestAsyncProcess { private static final String success = "success"; private static Exception failure = new Exception("failure"); + static class CountingThreadFactory implements ThreadFactory { + final AtomicInteger nbThreads; + ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess"); + @Override + public Thread newThread(Runnable r) { + nbThreads.incrementAndGet(); + return realFactory.newThread(r); + } + + CountingThreadFactory(AtomicInteger nbThreads){ + this.nbThreads = nbThreads; + } + } + static class MyAsyncProcess<Res> extends AsyncProcess<Res> { final AtomicInteger nbMultiResponse = new AtomicInteger(); final AtomicInteger nbActions = new AtomicInteger(); - static class CountingThreadFactory implements ThreadFactory { - final AtomicInteger nbThreads; - ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess"); - @Override - public Thread newThread(Runnable r) { - nbThreads.incrementAndGet(); - return realFactory.newThread(r); - } - - CountingThreadFactory(AtomicInteger nbThreads){ - this.nbThreads = nbThreads; - } - } - public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback, Configuration conf) { this(hc, callback, conf, new AtomicInteger()); } @@ -124,6 +124,33 @@ public class TestAsyncProcess { } } + static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{ + + public CallerWithFailure() { + super(100, 100); + } + + @Override + public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable) + throws IOException, RuntimeException { + throw new IOException("test"); + } + } + + static class AsyncProcessWithFailure<Res> extends MyAsyncProcess<Res> { + + public AsyncProcessWithFailure(HConnection hc, Configuration conf) { + super(hc, null, conf, new AtomicInteger()); + serverTrackerTimeout = 1; + } + + @Override + protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { + return new CallerWithFailure(); + } + } + + static MultiResponse createMultiResponse(final HRegionLocation loc, final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) { final MultiResponse mr = new MultiResponse(); @@ -707,6 +734,33 @@ public class TestAsyncProcess { Assert.assertEquals(ht.ap.tasksSent.get(), 3); } + @Test + public void testGlobalErrors() throws IOException { + HTable ht = new HTable(); + Configuration configuration = new Configuration(conf); + configuration.setBoolean(HConnectionManager.RETRIES_BY_SERVER_KEY, true); + configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + ht.connection = new MyConnectionImpl(configuration); + AsyncProcessWithFailure<Object> ap = + new AsyncProcessWithFailure<Object>(ht.connection, configuration); + ht.ap = ap; + + Assert.assertNotNull(ht.ap.createServerErrorTracker()); + + Put p = createPut(1, true); + ht.setAutoFlush(false, false); + ht.put(p); + + try { + ht.flushCommits(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + // Checking that the ErrorsServers came into play and didn't make us stop immediately + Assert.assertEquals(3, ht.ap.tasksSent.get()); + } + + /** * This test simulates multiple regions on 2 servers. We should have 2 multi requests and * 2 threads: 1 per server, this whatever the number of regions.
