Repository: hbase Updated Branches: refs/heads/master 7c19490ba -> e2a070cae
HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2a070ca Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2a070ca Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2a070ca Branch: refs/heads/master Commit: e2a070cae0ab785b771a923146116c0e9f3452a5 Parents: 7c19490 Author: CHIA-PING TSAI <[email protected]> Authored: Mon Mar 13 14:33:36 2017 +0800 Committer: Chia-Ping Tsai <[email protected]> Committed: Fri Mar 17 07:49:13 2017 +0800 ---------------------------------------------------------------------- .../hbase/client/AsyncRequestFutureImpl.java | 49 ++++---------------- .../hadoop/hbase/client/TestAsyncProcess.java | 33 +++++++++++-- 2 files changed, 40 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e2a070ca/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 41431bb..e6e4fd1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -28,7 +28,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -181,13 +180,14 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { * Runnable (that can be submitted to thread pool) that submits MultiAction to a * single server. The server call is synchronous, therefore we do it on a thread pool. */ - private final class SingleServerRequestRunnable implements Runnable { + @VisibleForTesting + final class SingleServerRequestRunnable implements Runnable { private final MultiAction multiAction; private final int numAttempt; private final ServerName server; private final Set<CancellableRegionServerCallable> callsInProgress; - private Long heapSize = null; - private SingleServerRequestRunnable( + @VisibleForTesting + SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, Set<CancellableRegionServerCallable> callsInProgress) { this.multiAction = multiAction; @@ -196,24 +196,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { this.callsInProgress = callsInProgress; } - @VisibleForTesting - long heapSize() { - if (heapSize != null) { - return heapSize; - } - heapSize = 0L; - for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) { - List<Action> actions = e.getValue(); - for (Action action: actions) { - Row row = action.getAction(); - if (row instanceof Mutation) { - heapSize += ((Mutation) row).heapSize(); - } - } - } - return heapSize; - } - @Override public void run() { AbstractResponse res = null; @@ -303,7 +285,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { private final CancellableRegionServerCallable currentCallable; private final int operationTimeout; private final int rpcTimeout; - private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); private final AsyncProcess asyncProcess; /** @@ -423,20 +404,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { } @VisibleForTesting - Map<ServerName, List<Long>> getRequestHeapSize() { - return heapSizesByServer; + SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server, + Set<CancellableRegionServerCallable> callsInProgress) { + return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress); } - private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server, - SingleServerRequestRunnable runnable) { - List<Long> heapCount = heapSizesByServer.get(server); - if (heapCount == null) { - heapCount = new LinkedList<>(); - heapSizesByServer.put(server, heapCount); - } - heapCount.add(runnable.heapSize()); - return runnable; - } /** * Group a list of actions per region servers, and send them. * @@ -608,8 +580,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { asyncProcess.connection.getConnectionMetrics().incrNormalRunners(); } asyncProcess.incTaskCounters(multiAction.getRegions(), server); - SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress)); + SingleServerRequestRunnable runnable = createSingleServerRequest( + multiAction, numAttempt, server, callsInProgress); return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable)); } @@ -631,8 +603,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture { for (DelayingRunner runner : actions.values()) { asyncProcess.incTaskCounters(runner.getActions().getRegions(), server); String traceText = "AsyncProcess.sendMultiAction"; - Runnable runnable = addSingleServerRequestHeapSize(server, - new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress)); + Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress); // use a delay runner only if we need to sleep for some time if (runner.getSleepTime() > 0) { runner.setRunner(runnable); http://git-wip-us.apache.org/repos/asf/hbase/blob/e2a070ca/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index f2f0467..3139af1 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -261,7 +261,7 @@ public class TestAsyncProcess { static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> { - + private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>(); public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions, long nonceGroup, AsyncProcess asyncProcess) { super(task, actions, nonceGroup, asyncProcess); @@ -272,6 +272,33 @@ public class TestAsyncProcess { // Do nothing for avoiding the NPE if we test the ClientBackofPolicy. } + Map<ServerName, List<Long>> getRequestHeapSize() { + return heapSizesByServer; + } + + @Override + SingleServerRequestRunnable createSingleServerRequest( + MultiAction multiAction, int numAttempt, ServerName server, + Set<CancellableRegionServerCallable> callsInProgress) { + SingleServerRequestRunnable rq = new SingleServerRequestRunnable( + multiAction, numAttempt, server, callsInProgress); + List<Long> heapCount = heapSizesByServer.get(server); + if (heapCount == null) { + heapCount = new ArrayList<>(); + heapSizesByServer.put(server, heapCount); + } + heapCount.add(heapSizeOf(multiAction)); + return rq; + } + + private long heapSizeOf(MultiAction multiAction) { + return multiAction.actions.values().stream() + .flatMap(v -> v.stream()) + .map(action -> action.getAction()) + .filter(row -> row instanceof Mutation) + .mapToLong(row -> ((Mutation) row).heapSize()) + .sum(); + } } static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{ @@ -635,7 +662,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; if (ars.getRequestHeapSize().containsKey(sn)) { ++actualSnReqCount; } @@ -651,7 +678,7 @@ public class TestAsyncProcess { if (!(req instanceof AsyncRequestFutureImpl)) { continue; } - AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req; + MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req; Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize(); for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) { long sum = 0;
