Repository: hbase Updated Branches: refs/heads/master 84ed7cf64 -> 3fa92647d
HBASE-11347 For some errors, the client can retry infinitely Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3fa92647 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3fa92647 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3fa92647 Branch: refs/heads/master Commit: 3fa92647d24dc87cc958b33c5becd8dda16d1326 Parents: 84ed7cf Author: Nicolas Liochon <[email protected]> Authored: Sat Jun 14 08:45:07 2014 +0200 Committer: Nicolas Liochon <[email protected]> Committed: Sat Jun 14 08:45:07 2014 +0200 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 12 +- .../hadoop/hbase/client/TestAsyncProcess.java | 138 +++++++++++++------ 2 files changed, 98 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/3fa92647/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 433322f..fb3612f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -522,9 +522,6 @@ class AsyncProcess { private final Object[] results; private final long nonceGroup; - @VisibleForTesting - protected AtomicInteger hardRetryLimit = null; // used for tests to stop retries. - public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback) { @@ -558,7 +555,7 @@ class AsyncProcess { final Map<ServerName, MultiAction<Row>> actionsByServer = new HashMap<ServerName, MultiAction<Row>>(); - HRegionLocation loc = null; + HRegionLocation loc; for (Action<Row> action : currentActions) { try { loc = findDestLocation(tableName, action.getAction()); @@ -661,10 +658,6 @@ class AsyncProcess { canRetry = false; } - if (canRetry && hardRetryLimit != null) { - canRetry = hardRetryLimit.decrementAndGet() >= 0; - } - if (!canRetry) { // Batch.Callback<Res> was not called on failure in 0.94. We keep this. errors.add(throwable, row, server); @@ -692,11 +685,12 @@ class AsyncProcess { byte[] row = rsActions.actions.values().iterator().next().get(0).getAction().getRow(); hConnection.updateCachedLocations(tableName, row, null, server); errorsByServer.reportServerError(server); + boolean canRetry = errorsByServer.canRetryMore(numAttempt); List<Action<Row>> toReplay = new ArrayList<Action<Row>>(); for (Map.Entry<byte[], List<Action<Row>>> e : rsActions.actions.entrySet()) { for (Action<Row> action : e.getValue()) { - if (manageError(action.getOriginalIndex(), action.getAction(), true, t, server)) { + if (manageError(action.getOriginalIndex(), action.getAction(), canRetry, t, server)) { toReplay.add(action); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/3fa92647/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index c31e451..edffd18 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.Mockito; @@ -45,6 +46,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -77,10 +79,32 @@ public class TestAsyncProcess { private static final String success = "success"; private static Exception failure = new Exception("failure"); + private static int NB_RETRIES = 3; + + @BeforeClass + public static void beforeClass(){ + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, NB_RETRIES); + } + + static class CountingThreadFactory implements ThreadFactory { + final AtomicInteger nbThreads; + ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess"); + @Override + public Thread newThread(Runnable r) { + nbThreads.incrementAndGet(); + return realFactory.newThread(r); + } + + CountingThreadFactory(AtomicInteger nbThreads){ + this.nbThreads = nbThreads; + } + } + static class MyAsyncProcess extends AsyncProcess { final AtomicInteger nbMultiResponse = new AtomicInteger(); final AtomicInteger nbActions = new AtomicInteger(); public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>(); + public AtomicInteger callsCt = new AtomicInteger(); @Override protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, @@ -89,36 +113,11 @@ public class TestAsyncProcess { // Test HTable has tableName of null, so pass DUMMY_TABLE AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture( DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); - r.hardRetryLimit = new AtomicInteger(1); allReqs.add(r); + callsCt.incrementAndGet(); return r; } - @SuppressWarnings("unchecked") - public long getRetriesRequested() { - long result = 0; - for (AsyncRequestFuture ars : allReqs) { - if (ars instanceof AsyncProcess.AsyncRequestFutureImpl) { - result += (1 - ((AsyncRequestFutureImpl<?>)ars).hardRetryLimit.get()); - } - } - return result; - } - - static class CountingThreadFactory implements ThreadFactory { - final AtomicInteger nbThreads; - ThreadFactory realFactory = Threads.newDaemonThreadFactory("test-TestAsyncProcess"); - @Override - public Thread newThread(Runnable r) { - nbThreads.incrementAndGet(); - return realFactory.newThread(r); - } - - CountingThreadFactory(AtomicInteger nbThreads){ - this.nbThreads = nbThreads; - } - } - public MyAsyncProcess(ClusterConnection hc, Configuration conf) { this(hc, conf, new AtomicInteger()); } @@ -136,6 +135,17 @@ public class TestAsyncProcess { new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); } + public MyAsyncProcess( + ClusterConnection hc, Configuration conf, boolean useGlobalErrors, boolean dummy) { + super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, + new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())) { + public void execute(Runnable command) { + throw new RejectedExecutionException("test under failure"); + } + }, + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + } + @Override public <Res> AsyncRequestFuture submit(TableName tableName, List<? extends Row> rows, boolean atLeastOne, Callback<Res> callback, boolean needResults) @@ -146,6 +156,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { + callsCt.incrementAndGet(); final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions); return new RpcRetryingCaller<MultiResponse>(100, 10) { @@ -166,6 +177,33 @@ public class TestAsyncProcess { } } + static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{ + + public CallerWithFailure() { + super(100, 100); + } + + @Override + public MultiResponse callWithoutRetries(RetryingCallable<MultiResponse> callable, int callTimeout) + throws IOException, RuntimeException { + throw new IOException("test"); + } + } + + static class AsyncProcessWithFailure extends MyAsyncProcess { + + public AsyncProcessWithFailure(ClusterConnection hc, Configuration conf) { + super(hc, conf, true); + serverTrackerTimeout = 1; + } + + @Override + protected RpcRetryingCaller<MultiResponse> createCaller(MultiServerCallable<Row> callable) { + callsCt.incrementAndGet(); + return new CallerWithFailure(); + } + } + static MultiResponse createMultiResponse( final MultiAction<Row> multi, AtomicInteger nbMultiResponse, AtomicInteger nbActions) { final MultiResponse mr = new MultiResponse(); @@ -188,15 +226,7 @@ public class TestAsyncProcess { */ static class MyConnectionImpl extends ConnectionManager.HConnectionImplementation { final AtomicInteger nbThreads = new AtomicInteger(0); - final static Configuration c = new Configuration(); - - static { - c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); - } - protected MyConnectionImpl() { - super(c); - } protected MyConnectionImpl(Configuration conf) { super(conf); @@ -217,7 +247,7 @@ public class TestAsyncProcess { final boolean usedRegions[]; protected MyConnectionImpl2(List<HRegionLocation> hrl) { - super(c); + super(conf); this.hrl = hrl; this.usedRegions = new boolean[hrl.size()]; } @@ -320,7 +350,7 @@ public class TestAsyncProcess { Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); verifyResult(ars, false); - Assert.assertEquals(2L, ap.getRetriesRequested()); + Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); Assert.assertEquals(1, ars.getErrors().exceptions.size()); Assert.assertTrue("was: " + ars.getErrors().exceptions.get(0), @@ -386,7 +416,8 @@ public class TestAsyncProcess { Assert.assertTrue(puts.isEmpty()); ars.waitUntilDone(); verifyResult(ars, false, true, true); - Assert.assertEquals(2, ap.getRetriesRequested()); + Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); + ap.callsCt.set(0); Assert.assertEquals(1, ars.getErrors().actions.size()); puts.add(createPut(1, true)); @@ -395,7 +426,7 @@ public class TestAsyncProcess { ars = ap.submit(DUMMY_TABLE, puts, false, null, true); Assert.assertEquals(0, puts.size()); ars.waitUntilDone(); - Assert.assertEquals(2, ap.getRetriesRequested()); + Assert.assertEquals(2, ap.callsCt.get()); verifyResult(ars, true); } @@ -411,7 +442,7 @@ public class TestAsyncProcess { AsyncRequestFuture ars = ap.submit(DUMMY_TABLE, puts, false, null, true); ars.waitUntilDone(); verifyResult(ars, false, true, true); - Assert.assertEquals(2, ap.getRetriesRequested()); + Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); Assert.assertEquals(1, ars.getFailedOperations().size()); } @@ -608,7 +639,7 @@ public class TestAsyncProcess { @Test public void testBatch() throws IOException, InterruptedException { HTable ht = new HTable(); - ht.connection = new MyConnectionImpl(); + ht.connection = new MyConnectionImpl(conf); ht.multiAp = new MyAsyncProcess(ht.connection, conf, false); List<Put> puts = new ArrayList<Put>(); @@ -641,12 +672,11 @@ public class TestAsyncProcess { HTable ht = new HTable(); Configuration configuration = new Configuration(conf); configuration.setBoolean(ConnectionManager.RETRIES_BY_SERVER_KEY, true); - configuration.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); // set default writeBufferSize ht.setWriteBufferSize(configuration.getLong("hbase.client.write.buffer", 2097152)); ht.connection = new MyConnectionImpl(configuration); - MyAsyncProcess ap = new MyAsyncProcess(ht.connection, conf, true); + MyAsyncProcess ap = new MyAsyncProcess(ht.connection, configuration, true); ht.ap = ap; Assert.assertNotNull(ht.ap.createServerErrorTracker()); @@ -663,7 +693,29 @@ public class TestAsyncProcess { } catch (RetriesExhaustedWithDetailsException expected) { } // Checking that the ErrorsServers came into play and didn't make us stop immediately - Assert.assertEquals(2, ap.getRetriesRequested()); + Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); + } + + @Test + public void testGlobalErrors() throws IOException { + HTable ht = new HTable(); + ht.connection = new MyConnectionImpl(conf); + AsyncProcessWithFailure ap = new AsyncProcessWithFailure(ht.connection, conf); + ht.ap = ap; + + Assert.assertNotNull(ht.ap.createServerErrorTracker()); + + Put p = createPut(1, true); + ht.setAutoFlush(false, false); + ht.put(p); + + try { + ht.flushCommits(); + Assert.fail(); + } catch (RetriesExhaustedWithDetailsException expected) { + } + // Checking that the ErrorsServers came into play and didn't make us stop immediately + Assert.assertEquals(NB_RETRIES + 1, ap.callsCt.get()); } /**
