Repository: hbase Updated Branches: refs/heads/master 9cb0094bd -> 0f92e943a
HBASE-16515 AsyncProcess has incorrent count of tasks if the backoff policy is enabled (ChiaPing Tsai) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0f92e943 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0f92e943 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0f92e943 Branch: refs/heads/master Commit: 0f92e943aca497d60358b0ce32ec690a04e4fd85 Parents: 9cb0094 Author: tedyu <[email protected]> Authored: Tue Aug 30 08:01:49 2016 -0700 Committer: tedyu <[email protected]> Committed: Tue Aug 30 08:01:49 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 6 +- .../hadoop/hbase/client/TestAsyncProcess.java | 79 +++++++++++++++----- 2 files changed, 66 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0f92e943/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 045885f..5bb0f58 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -1066,7 +1066,6 @@ class AsyncProcess { for (Map.Entry<ServerName, MultiAction<Row>> e : actionsByServer.entrySet()) { ServerName server = e.getKey(); MultiAction<Row> multiAction = e.getValue(); - incTaskCounters(multiAction.getRegions(), server); Collection<? extends Runnable> runnables = getNewMultiActionRunnable(server, multiAction, numAttempt); // make sure we correctly count the number of runnables before we try to reuse the send @@ -1114,6 +1113,7 @@ class AsyncProcess { if (connection.getConnectionMetrics() != null) { connection.getConnectionMetrics().incrNormalRunners(); } + incTaskCounters(multiAction.getRegions(), server); SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); @@ -1136,6 +1136,7 @@ class AsyncProcess { List<Runnable> toReturn = new ArrayList<Runnable>(actions.size()); for (DelayingRunner runner : actions.values()) { + incTaskCounters(runner.getActions().getRegions(), server); String traceText = "AsyncProcess.sendMultiAction"; Runnable runnable = addSingleServerRequestHeapSize(server, new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); @@ -1757,7 +1758,8 @@ class AsyncProcess { } } - private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { + @VisibleForTesting + protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { boolean metrics = AsyncProcess.this.connection.getConnectionMetrics() != null; boolean stats = AsyncProcess.this.connection.getStatisticsTracker() != null; if (!stats && !metrics) { http://git-wip-us.apache.org/repos/asf/hbase/blob/0f92e943/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 516f2cf..00f5232 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -82,24 +82,10 @@ import org.junit.rules.TestRule; import org.mockito.Mockito; import org.apache.hadoop.hbase.client.AsyncProcess.RowChecker; import org.apache.hadoop.hbase.client.AsyncProcess.SubmittedSizeChecker; +import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy; +import org.apache.hadoop.hbase.client.backoff.ServerStatistics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -225,6 +211,11 @@ public class TestAsyncProcess { return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true); } + + @Override + protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { + // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. + } @Override protected RpcRetryingCaller<MultiResponse> createCaller( CancellableRegionServerCallable callable) { @@ -295,7 +286,21 @@ public class TestAsyncProcess { return new CallerWithFailure(ioe); } } - + /** + * Make the backoff time always different on each call. + */ + static class MyClientBackoffPolicy implements ClientBackoffPolicy { + private final Map<ServerName, AtomicInteger> count = new HashMap<>(); + @Override + public long getBackoffTime(ServerName serverName, byte[] region, ServerStatistics stats) { + AtomicInteger inc = count.get(serverName); + if (inc == null) { + inc = new AtomicInteger(0); + count.put(serverName, inc); + } + return inc.getAndIncrement(); + } + } class MyAsyncProcessWithReplicas extends MyAsyncProcess { private Set<byte[]> failures = new TreeSet<byte[]>(new Bytes.ByteArrayComparator()); private long primarySleepMs = 0, replicaSleepMs = 0; @@ -836,6 +841,46 @@ public class TestAsyncProcess { } @Test + public void testTaskCountWithoutClientBackoffPolicy() throws IOException, InterruptedException { + ClusterConnection hc = createHConnection(); + MyAsyncProcess ap = new MyAsyncProcess(hc, conf, false); + testTaskCount(ap); + } + + @Test + public void testTaskCountWithClientBackoffPolicy() throws IOException, InterruptedException { + Configuration copyConf = new Configuration(conf); + copyConf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true); + MyClientBackoffPolicy bp = new MyClientBackoffPolicy(); + ClusterConnection hc = createHConnection(); + Mockito.when(hc.getConfiguration()).thenReturn(copyConf); + Mockito.when(hc.getStatisticsTracker()).thenReturn(ServerStatisticTracker.create(copyConf)); + Mockito.when(hc.getBackoffPolicy()).thenReturn(bp); + MyAsyncProcess ap = new MyAsyncProcess(hc, copyConf, false); + testTaskCount(ap); + } + + private void testTaskCount(AsyncProcess ap) throws InterruptedIOException, InterruptedException { + List<Put> puts = new ArrayList<>(); + for (int i = 0; i != 3; ++i) { + puts.add(createPut(1, true)); + puts.add(createPut(2, true)); + puts.add(createPut(3, true)); + } + ap.submit(DUMMY_TABLE, puts, true, null, false); + ap.waitUntilDone(); + // More time to wait if there are incorrect task count. + TimeUnit.SECONDS.sleep(1); + assertEquals(0, ap.tasksInProgress.get()); + for (AtomicInteger count : ap.taskCounterPerRegion.values()) { + assertEquals(0, count.get()); + } + for (AtomicInteger count : ap.taskCounterPerServer.values()) { + assertEquals(0, count.get()); + } + } + + @Test public void testMaxTask() throws Exception { final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf, false);
