Repository: hbase Updated Branches: refs/heads/branch-1 d1ea718e4 -> d542b446b
HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d542b446 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d542b446 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d542b446 Branch: refs/heads/branch-1 Commit: d542b446b824315800b994cc3de7d8950a2ca671 Parents: d1ea718 Author: CHIA-PING TSAI <[email protected]> Authored: Mon Mar 13 19:28:57 2017 +0800 Committer: Chia-Ping Tsai <[email protected]> Committed: Fri Mar 17 07:52:45 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 67 +++++--------------- .../hadoop/hbase/client/TestAsyncProcess.java | 59 +++++++++++++++-- 2 files changed, 69 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d542b446/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 5bd9a4f..73cafc1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -404,7 +404,8 @@ class AsyncProcess { * @return pool if non null, otherwise returns this.pool if non null, otherwise throws * RuntimeException */ - private ExecutorService getPool(ExecutorService pool) { + @VisibleForTesting + ExecutorService getPool(ExecutorService pool) { if (pool != null) { return pool; } @@ -551,7 +552,8 @@ class AsyncProcess { List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer, ExecutorService pool) { AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( - tableName, retainedActions, nonceGroup, pool, callback, results, needResults); + tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, + operationTimeout, rpcTimeout); // Add location errors if any if (locationErrors != null) { for (int i = 0; i < locationErrors.size(); ++i) { @@ -759,13 +761,14 @@ class AsyncProcess { * Runnable (that can be submitted to thread pool) that submits MultiAction to a * single server. The server call is synchronous, therefore we do it on a thread pool. */ - private final class SingleServerRequestRunnable implements Runnable { + @VisibleForTesting + class SingleServerRequestRunnable implements Runnable { private final MultiAction<Row> multiAction; private final int numAttempt; private final ServerName server; private final Set<PayloadCarryingServerCallable> callsInProgress; - private Long heapSize = null; - private SingleServerRequestRunnable( + @VisibleForTesting + SingleServerRequestRunnable( MultiAction<Row> multiAction, int numAttempt, ServerName server, Set<PayloadCarryingServerCallable> callsInProgress) { this.multiAction = multiAction; @@ -774,24 +777,6 @@ class AsyncProcess { this.callsInProgress = callsInProgress; } - @VisibleForTesting - long heapSize() { - if (heapSize != null) { - return heapSize; - } - heapSize = 0L; - for (Map.Entry<byte[], List<Action<Row>>> e: this.multiAction.actions.entrySet()) { - List<Action<Row>> actions = e.getValue(); - for (Action<Row> action: actions) { - Row row = action.getAction(); - if (row instanceof Mutation) { - heapSize += ((Mutation) row).heapSize(); - } - } - } - return heapSize; - } - @Override public void run() { MultiResponse res; @@ -874,7 +859,6 @@ class AsyncProcess { private PayloadCarryingServerCallable currentCallable; private int operationTimeout; private int rpcTimeout; - private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); private RetryingTimeTracker tracker; public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, @@ -961,21 +945,13 @@ class AsyncProcess { public Set<PayloadCarryingServerCallable> getCallsInProgress() { return callsInProgress; } + @VisibleForTesting - Map<ServerName, List<Long>> getRequestHeapSize() { - return heapSizesByServer; + SingleServerRequestRunnable createSingleServerRequest(MultiAction<Row> multiAction, int numAttempt, ServerName server, + Set<PayloadCarryingServerCallable> callsInProgress) { + return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); } - private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, - SingleServerRequestRunnable runnable) { - List<Long> heapCount = heapSizesByServer.get(server); - if (heapCount == null) { - heapCount = new LinkedList<>(); - heapSizesByServer.put(server, heapCount); - } - heapCount.add(runnable.heapSize()); - return runnable; - } /** * Group a list of actions per region servers, and send them. * @@ -1148,8 +1124,7 @@ class AsyncProcess { connection.getConnectionMetrics().incrNormalRunners(); } incTaskCounters(multiAction.getRegions(), server); - SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + SingleServerRequestRunnable runnable = createSingleServerRequest(multiAction, numAttempt, server, callsInProgress); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); } @@ -1172,8 +1147,7 @@ class AsyncProcess { for (DelayingRunner runner : actions.values()) { incTaskCounters(runner.getActions().getRegions(), server); String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); + Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); @@ -1829,7 +1803,8 @@ class AsyncProcess { } } - protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( + @VisibleForTesting + <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<CResult> callback, Object[] results, boolean needResults, PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { @@ -1838,16 +1813,6 @@ class AsyncProcess { results, callback, callable, operationTimeout, rpcTimeout); } - @VisibleForTesting - /** Create AsyncRequestFuture. Isolated to be easily overridden in the tests. */ - protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( - TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, - Batch.Callback<CResult> callback, Object[] results, boolean needResults) { - return createAsyncRequestFuture( - tableName, actions, nonceGroup, pool, callback, results, needResults, null, - operationTimeout, rpcTimeout); - } - /** * Create a caller. Isolated to be easily overridden in the tests. */ http://git-wip-us.apache.org/repos/asf/hbase/blob/d542b446/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 027d362..8c0b7df 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -153,13 +153,16 @@ public class TestAsyncProcess { public AtomicInteger callsCt = new AtomicInteger(); private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); private long previousTimeout = -1; + @Override - protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, + <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, - Batch.Callback<Res> callback, Object[] results, boolean needResults) { + Batch.Callback<Res> callback, Object[] results, boolean needResults, + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { // Test HTable has tableName of null, so pass DUMMY_TABLE - AsyncRequestFutureImpl<Res> r = super.createAsyncRequestFuture( - DUMMY_TABLE, actions, nonceGroup, pool, callback, results, needResults); + MyAsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl( + DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults, results, callback, callable, + operationTimeout, rpcTimeout); allReqs.add(r); callsCt.incrementAndGet(); return r; @@ -254,6 +257,50 @@ public class TestAsyncProcess { } }; } + + class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { + + private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); + + MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, + ExecutorService pool, boolean needResults, Object[] results, + Batch.Callback<Res> callback, PayloadCarryingServerCallable callable, + int operationTimeout, int rpcTimeout) { + super(tableName, actions, nonceGroup, pool, needResults, results, callback, callable, operationTimeout, rpcTimeout); + } + + Map<ServerName, List<Long>> getRequestHeapSize() { + return heapSizesByServer; + } + + @Override + SingleServerRequestRunnable createSingleServerRequest( + MultiAction<Row> multiAction, int numAttempt, ServerName server, + Set<PayloadCarryingServerCallable> callsInProgress) { + SingleServerRequestRunnable rq = new SingleServerRequestRunnable( + multiAction, numAttempt, server, callsInProgress); + List<Long> heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new ArrayList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(heapSizeOf(multiAction)); + return rq; + } + + long heapSizeOf(MultiAction<Row> multiAction) { + long sum = 0; + for (List<Action<Row>> actions : multiAction.actions.values()) { + for (Action action : actions) { + Row row = action.getAction(); + if (row instanceof Mutation) { + sum += ((Mutation) row).heapSize(); + } + } + } + return sum; + } + } } static class CallerWithFailure extends RpcRetryingCaller<MultiResponse>{ @@ -644,7 +691,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl) req; if (ars.getRequestHeapSize().containsKey(sn)) { ++actualSnReqCount; } @@ -660,7 +707,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncProcess.MyAsyncRequestFutureImpl ars = (MyAsyncProcess.MyAsyncRequestFutureImpl) req; Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { long sum = 0;
