This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 0645d5a  HBASE-22267 Implement client push back for async client
0645d5a is described below

commit 0645d5a0337c649c17860a733476e81e1186a328
Author: zhangduo <[email protected]>
AuthorDate: Sun Apr 21 11:58:52 2019 +0800

    HBASE-22267 Implement client push back for async client
---
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  | 119 ++++++++----
 .../hadoop/hbase/client/AsyncConnectionImpl.java   |  15 ++
 .../hbase/client/AsyncRequestFutureImpl.java       |  42 +----
 .../hadoop/hbase/client/ConnectionUtils.java       |  23 +++
 .../hadoop/hbase/client/MetricsConnection.java     |   8 +-
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  12 +-
 .../hbase/client/ServerStatisticTracker.java       |  10 +-
 .../hadoop/hbase/client/TestAsyncProcess.java      |   2 +-
 ...ntPushback.java => ClientPushbackTestBase.java} | 112 +++++------
 .../hbase/client/TestAsyncClientPushback.java      |  96 ++++++++++
 .../hadoop/hbase/client/TestClientPushback.java    | 204 +++++----------------
 11 files changed, 328 insertions(+), 315 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index f9bcf74..e429422 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -54,6 +54,8 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.MultiResponse.RegionResult;
 import 
org.apache.hadoop.hbase.client.RetriesExhaustedException.ThrowableWithExtraContext;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -134,6 +136,10 @@ class AsyncBatchRpcRetryingCaller<T> {
         () -> new RegionRequest(loc)).actions.add(action);
     }
 
+    public void setRegionRequest(byte[] regionName, RegionRequest regionReq) {
+      actionsByRegion.put(regionName, regionReq);
+    }
+
     public int getPriority() {
       return actionsByRegion.values().stream().flatMap(rr -> 
rr.actions.stream())
         .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET);
@@ -298,6 +304,8 @@ class AsyncBatchRpcRetryingCaller<T> {
 
   private void onComplete(Map<byte[], RegionRequest> actionsByRegion, int 
tries,
       ServerName serverName, MultiResponse resp) {
+    ConnectionUtils.updateStats(conn.getStatisticsTracker(), 
conn.getConnectionMetrics(),
+      serverName, resp);
     List<Action> failedActions = new ArrayList<>();
     MutableBoolean retryImmediately = new MutableBoolean(false);
     actionsByRegion.forEach((rn, regionReq) -> {
@@ -333,55 +341,88 @@ class AsyncBatchRpcRetryingCaller<T> {
     }
   }
 
-  private void send(Map<ServerName, ServerRequest> actionsByServer, int tries) 
{
+  private void sendToServer(ServerName serverName, ServerRequest serverReq, 
int tries) {
     long remainingNs;
     if (operationTimeoutNs > 0) {
       remainingNs = remainingTimeNs();
       if (remainingNs <= 0) {
-        failAll(actionsByServer.values().stream().flatMap(m -> 
m.actionsByRegion.values().stream())
-          .flatMap(r -> r.actions.stream()), tries);
+        failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> 
r.actions.stream()),
+          tries);
         return;
       }
     } else {
       remainingNs = Long.MAX_VALUE;
     }
-    actionsByServer.forEach((sn, serverReq) -> {
-      ClientService.Interface stub;
-      try {
-        stub = conn.getRegionServerStub(sn);
-      } catch (IOException e) {
-        onError(serverReq.actionsByRegion, tries, e, sn);
-        return;
-      }
-      ClientProtos.MultiRequest req;
-      List<CellScannable> cells = new ArrayList<>();
-      // Map from a created RegionAction to the original index for a 
RowMutations within
-      // the original list of actions. This will be used to process the 
results when there
-      // is RowMutations in the action list.
-      Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
-      try {
-        req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
-      } catch (IOException e) {
-        onError(serverReq.actionsByRegion, tries, e, sn);
-        return;
-      }
-      HBaseRpcController controller = 
conn.rpcControllerFactory.newController();
-      resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
-        calcPriority(serverReq.getPriority(), tableName));
-      if (!cells.isEmpty()) {
-        controller.setCellScanner(createCellScanner(cells));
+    ClientService.Interface stub;
+    try {
+      stub = conn.getRegionServerStub(serverName);
+    } catch (IOException e) {
+      onError(serverReq.actionsByRegion, tries, e, serverName);
+      return;
+    }
+    ClientProtos.MultiRequest req;
+    List<CellScannable> cells = new ArrayList<>();
+    // Map from a created RegionAction to the original index for a 
RowMutations within
+    // the original list of actions. This will be used to process the results 
when there
+    // is RowMutations in the action list.
+    Map<Integer, Integer> rowMutationsIndexMap = new HashMap<>();
+    try {
+      req = buildReq(serverReq.actionsByRegion, cells, rowMutationsIndexMap);
+    } catch (IOException e) {
+      onError(serverReq.actionsByRegion, tries, e, serverName);
+      return;
+    }
+    HBaseRpcController controller = conn.rpcControllerFactory.newController();
+    resetController(controller, Math.min(rpcTimeoutNs, remainingNs),
+      calcPriority(serverReq.getPriority(), tableName));
+    if (!cells.isEmpty()) {
+      controller.setCellScanner(createCellScanner(cells));
+    }
+    stub.multi(controller, req, resp -> {
+      if (controller.failed()) {
+        onError(serverReq.actionsByRegion, tries, controller.getFailed(), 
serverName);
+      } else {
+        try {
+          onComplete(serverReq.actionsByRegion, tries, serverName, 
ResponseConverter.getResults(req,
+            rowMutationsIndexMap, resp, controller.cellScanner()));
+        } catch (Exception e) {
+          onError(serverReq.actionsByRegion, tries, e, serverName);
+          return;
+        }
       }
-      stub.multi(controller, req, resp -> {
-        if (controller.failed()) {
-          onError(serverReq.actionsByRegion, tries, controller.getFailed(), 
sn);
+    });
+  }
+
+  // We will make use of the ServerStatisticTracker to determine whether we 
need to delay a bit,
+  // based on the load of the region server and the region.
+  private void sendOrDelay(Map<ServerName, ServerRequest> actionsByServer, int 
tries) {
+    Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
+    Optional<ServerStatisticTracker> optStats = conn.getStatisticsTracker();
+    if (!optStats.isPresent()) {
+      actionsByServer.forEach((serverName, serverReq) -> {
+        metrics.ifPresent(MetricsConnection::incrNormalRunners);
+        sendToServer(serverName, serverReq, tries);
+      });
+      return;
+    }
+    ServerStatisticTracker stats = optStats.get();
+    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
+    actionsByServer.forEach((serverName, serverReq) -> {
+      ServerStatistics serverStats = stats.getStats(serverName);
+      Map<Long, ServerRequest> groupByBackoff = new HashMap<>();
+      serverReq.actionsByRegion.forEach((regionName, regionReq) -> {
+        long backoff = backoffPolicy.getBackoffTime(serverName, regionName, 
serverStats);
+        groupByBackoff.computeIfAbsent(backoff, k -> new ServerRequest())
+          .setRegionRequest(regionName, regionReq);
+      });
+      groupByBackoff.forEach((backoff, sr) -> {
+        if (backoff > 0) {
+          metrics.ifPresent(m -> 
m.incrDelayRunnersAndUpdateDelayInterval(backoff));
+          retryTimer.newTimeout(timer -> sendToServer(serverName, sr, tries), 
backoff,
+            TimeUnit.MILLISECONDS);
         } else {
-          try {
-            onComplete(serverReq.actionsByRegion, tries, sn, 
ResponseConverter.getResults(req,
-              rowMutationsIndexMap, resp, controller.cellScanner()));
-          } catch (Exception e) {
-            onError(serverReq.actionsByRegion, tries, e, sn);
-            return;
-          }
+          metrics.ifPresent(MetricsConnection::incrNormalRunners);
+          sendToServer(serverName, sr, tries);
         }
       });
     });
@@ -454,7 +495,7 @@ class AsyncBatchRpcRetryingCaller<T> {
         }))
       .toArray(CompletableFuture[]::new)), (v, r) -> {
         if (!actionsByServer.isEmpty()) {
-          send(actionsByServer, tries);
+          sendOrDelay(actionsByServer, tries);
         }
         if (!locateFailed.isEmpty()) {
           tryResubmit(locateFailed.stream(), tries, false);
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index f046e7a..7d59984 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.ChoreService;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcClientFactory;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -101,6 +103,9 @@ class AsyncConnectionImpl implements AsyncConnection {
   private final AtomicReference<CompletableFuture<MasterService.Interface>> 
masterStubMakeFuture =
     new AtomicReference<>();
 
+  private final Optional<ServerStatisticTracker> stats;
+  private final ClientBackoffPolicy backoffPolicy;
+
   private ChoreService authService;
 
   private volatile boolean closed = false;
@@ -133,6 +138,8 @@ class AsyncConnectionImpl implements AsyncConnection {
     } else {
       nonceGenerator = NO_NONCE_GENERATOR;
     }
+    this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
+    this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
   }
 
   private void spawnRenewalChore(final UserGroupInformation user) {
@@ -233,6 +240,14 @@ class AsyncConnectionImpl implements AsyncConnection {
     masterStub.compareAndSet(stub, null);
   }
 
+  Optional<ServerStatisticTracker> getStatisticsTracker() {
+    return stats;
+  }
+
+  ClientBackoffPolicy getBackoffPolicy() {
+    return backoffPolicy;
+  }
+
   @Override
   public AsyncTableBuilder<AdvancedScanResultConsumer> 
getTableBuilder(TableName tableName) {
     return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, 
connConf) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 525033d..e46a50e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -28,6 +28,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -55,9 +56,6 @@ import org.slf4j.LoggerFactory;
 
 import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-
 /**
  * The context, and return value, for a single submit/submitAll call.
  * Note on how this class (one AP submit) works. Initially, all requests are 
split into groups
@@ -614,8 +612,8 @@ class AsyncRequestFutureImpl<CResult> implements 
AsyncRequestFuture {
         traceText = "AsyncProcess.clientBackoff.sendMultiAction";
         runnable = runner;
         if (asyncProcess.connection.getConnectionMetrics() != null) {
-          asyncProcess.connection.getConnectionMetrics().incrDelayRunners();
-          
asyncProcess.connection.getConnectionMetrics().updateDelayInterval(runner.getSleepTime());
+          asyncProcess.connection.getConnectionMetrics()
+            .incrDelayRunnersAndUpdateDelayInterval(runner.getSleepTime());
         }
       } else {
         if (asyncProcess.connection.getConnectionMetrics() != null) {
@@ -802,19 +800,16 @@ class AsyncRequestFutureImpl<CResult> implements 
AsyncRequestFuture {
    * @param responses      - the response, if any
    * @param numAttempt     - the attempt
    */
-  private void receiveMultiAction(MultiAction multiAction,
-                                  ServerName server, MultiResponse responses, 
int numAttempt) {
+  private void receiveMultiAction(MultiAction multiAction, ServerName server,
+      MultiResponse responses, int numAttempt) {
     assert responses != null;
-
-    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
-    updateStats(server, results);
-
+    updateStats(server, responses);
     // Success or partial success
     // Analyze detailed results. We can still have individual failures to be 
redo.
     // two specific throwables are managed:
     //  - DoNotRetryIOException: we continue to retry for other actions
     //  - RegionMovedException: we update the cache with the new region 
location
-
+    Map<byte[], MultiResponse.RegionResult> results = responses.getResults();
     List<Action> toReplay = new ArrayList<>();
     Throwable lastException = null;
     int failureCount = 0;
@@ -926,26 +921,9 @@ class AsyncRequestFutureImpl<CResult> implements 
AsyncRequestFuture {
   }
 
   @VisibleForTesting
-  protected void updateStats(ServerName server, Map<byte[], 
MultiResponse.RegionResult> results) {
-    boolean metrics = asyncProcess.connection.getConnectionMetrics() != null;
-    boolean stats = asyncProcess.connection.getStatisticsTracker() != null;
-    if (!stats && !metrics) {
-      return;
-    }
-    for (Map.Entry<byte[], MultiResponse.RegionResult> regionStats : 
results.entrySet()) {
-      byte[] regionName = regionStats.getKey();
-      ClientProtos.RegionLoadStats stat = regionStats.getValue().getStat();
-      if (stat == null) {
-        LOG.error("No ClientProtos.RegionLoadStats found for server=" + server
-          + ", region=" + Bytes.toStringBinary(regionName));
-        continue;
-      }
-      RegionLoadStats regionLoadstats = 
ProtobufUtil.createRegionLoadStats(stat);
-      
ResultStatsUtil.updateStats(asyncProcess.connection.getStatisticsTracker(), 
server,
-          regionName, regionLoadstats);
-      
ResultStatsUtil.updateStats(asyncProcess.connection.getConnectionMetrics(),
-          server, regionName, regionLoadstats);
-    }
+  protected void updateStats(ServerName server, MultiResponse resp) {
+    
ConnectionUtils.updateStats(Optional.ofNullable(asyncProcess.connection.getStatisticsTracker()),
+      Optional.ofNullable(asyncProcess.connection.getConnectionMetrics()), 
server, resp);
   }
 
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 6b06a7f..4a2fa3a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -62,8 +62,10 @@ import 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import org.apache.hbase.thirdparty.io.netty.util.Timer;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -672,4 +674,25 @@ public final class ConnectionUtils {
       }
     }
   }
+
+  static void updateStats(Optional<ServerStatisticTracker> optStats,
+      Optional<MetricsConnection> optMetrics, ServerName serverName, 
MultiResponse resp) {
+    if (!optStats.isPresent() && !optMetrics.isPresent()) {
+      // ServerStatisticTracker and MetricsConnection are both not present, 
just return
+      return;
+    }
+    resp.getResults().forEach((regionName, regionResult) -> {
+      ClientProtos.RegionLoadStats stat = regionResult.getStat();
+      if (stat == null) {
+        LOG.error("No ClientProtos.RegionLoadStats found for server={}, 
region={}", serverName,
+          Bytes.toStringBinary(regionName));
+        return;
+      }
+      RegionLoadStats regionLoadStats = 
ProtobufUtil.createRegionLoadStats(stat);
+      optStats.ifPresent(
+        stats -> ResultStatsUtil.updateStats(stats, serverName, regionName, 
regionLoadStats));
+      optMetrics.ifPresent(
+        metrics -> ResultStatsUtil.updateStats(metrics, serverName, 
regionName, regionLoadStats));
+    });
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index c62a712..d842f90 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -421,13 +421,9 @@ public class MetricsConnection implements 
StatisticTrackable {
     this.runnerStats.incrNormalRunners();
   }
 
-  /** Increment the number of delay runner counts. */
-  public void incrDelayRunners() {
+  /** Increment the number of delay runner counts and update delay interval of 
delay runner. */
+  public void incrDelayRunnersAndUpdateDelayInterval(long interval) {
     this.runnerStats.incrDelayRunners();
-  }
-
-  /** Update delay interval of delay runner. */
-  public void updateDelayInterval(long interval) {
     this.runnerStats.updateDelayInterval(interval);
   }
 
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 688c86f..6b87653 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -357,8 +357,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
       preCheck();
       return RawAsyncTableImpl.this
         .<Boolean> newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs)
-        .action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> 
mutateRow(controller, loc,
-          stub, mutation,
+        .action((controller, loc, stub) -> RawAsyncTableImpl.this.<Boolean> 
mutateRow(controller,
+          loc, stub, mutation,
           (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, 
qualifier,
             new BinaryComparator(value), CompareType.valueOf(op.name()), 
timeRange, rm),
           resp -> resp.getExists()))
@@ -373,7 +373,7 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
 
   // We need the MultiRequest when constructing the 
org.apache.hadoop.hbase.client.MultiResponse,
   // so here I write a new method as I do not want to change the abstraction 
of call method.
-  private static <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController 
controller,
+  private <RESP> CompletableFuture<RESP> mutateRow(HBaseRpcController 
controller,
       HRegionLocation loc, ClientService.Interface stub, RowMutations mutation,
       Converter<MultiRequest, byte[], RowMutations> reqConvert,
       Function<Result, RESP> respConverter) {
@@ -391,6 +391,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
             try {
               org.apache.hadoop.hbase.client.MultiResponse multiResp =
                 ResponseConverter.getResults(req, resp, 
controller.cellScanner());
+              ConnectionUtils.updateStats(conn.getStatisticsTracker(), 
conn.getConnectionMetrics(),
+                loc.getServerName(), multiResp);
               Throwable ex = multiResp.getException(regionName);
               if (ex != null) {
                 future.completeExceptionally(ex instanceof IOException ? ex
@@ -415,8 +417,8 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
   @Override
   public CompletableFuture<Void> mutateRow(RowMutations mutation) {
     return this.<Void> newCaller(mutation.getRow(), mutation.getMaxPriority(), 
writeRpcTimeoutNs)
-      .action((controller, loc, stub) -> RawAsyncTableImpl.<Void> 
mutateRow(controller, loc, stub,
-        mutation, (rn, rm) -> {
+      .action((controller, loc, stub) -> this.<Void> mutateRow(controller, 
loc, stub, mutation,
+        (rn, rm) -> {
           RegionAction.Builder regionMutationBuilder = 
RequestConverter.buildRegionAction(rn, rm);
           regionMutationBuilder.setAtomic(true);
           return 
MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
index c5c7375..12e3e3b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ServerStatisticTracker.java
@@ -19,15 +19,12 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
 
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 import java.util.concurrent.ConcurrentHashMap;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
+import org.apache.yetus.audience.InterfaceAudience;
 
 /**
  * Tracks the statistics for multiple regions
@@ -53,9 +50,4 @@ public class ServerStatisticTracker implements 
StatisticTrackable {
     }
     return new ServerStatisticTracker();
   }
-
-  @VisibleForTesting
-  ServerStatistics getServerStatsForTesting(ServerName server) {
-    return stats.get(server);
-  }
 }
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 71b21ac..81dcc46 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -271,7 +271,7 @@ public class TestAsyncProcess {
     }
 
     @Override
-    protected void updateStats(ServerName server, Map<byte[], 
MultiResponse.RegionResult> results) {
+    protected void updateStats(ServerName server, MultiResponse resp) {
       // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
     }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java
similarity index 64%
copy from 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
copy to 
hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java
index ae959fa..a7202b8 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ClientPushbackTestBase.java
@@ -24,13 +24,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
@@ -38,40 +35,31 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
 import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.junit.Test;
-import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test that we can actually send and use region metrics to slowdown client 
writes
  */
-@Category(MediumTests.class)
-public class TestClientPushback {
+public abstract class ClientPushbackTestBase {
 
-  @ClassRule
-  public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestClientPushback.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(ClientPushbackTestBase.class);
+  protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestClientPushback.class);
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
-
-  private static final TableName tableName = 
TableName.valueOf("client-pushback");
+  protected static final TableName tableName = 
TableName.valueOf("client-pushback");
   private static final byte[] family = Bytes.toBytes("f");
   private static final byte[] qualifier = Bytes.toBytes("q");
   private static final long flushSizeBytes = 512;
 
   @BeforeClass
-  public static void setupCluster() throws Exception{
+  public static void setupCluster() throws Exception {
     Configuration conf = UTIL.getConfiguration();
     // enable backpressure
     conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
@@ -82,52 +70,59 @@ public class TestClientPushback {
     // load
     conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
     // ensure we block the flushes when we are double that flushsize
-    conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
+    conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER,
+      HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
     conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
     UTIL.startMiniCluster(1);
     UTIL.createTable(tableName, family);
   }
 
   @AfterClass
-  public static void teardownCluster() throws Exception{
+  public static void cleanupCluster() throws Exception {
     UTIL.shutdownMiniCluster();
   }
 
-  @Test
-  public void testClientTracksServerPushback() throws Exception{
-    Configuration conf = UTIL.getConfiguration();
+  protected abstract ClientBackoffPolicy getBackoffPolicy() throws IOException;
+
+  protected abstract ServerStatisticTracker getStatisticsTracker() throws 
IOException;
+
+  protected abstract MetricsConnection getConnectionMetrics() throws 
IOException;
 
-    ClusterConnection conn = (ClusterConnection) 
ConnectionFactory.createConnection(conf);
-    BufferedMutatorImpl mutator = (BufferedMutatorImpl) 
conn.getBufferedMutator(tableName);
+  protected abstract void mutate(Put put) throws IOException;
 
+  protected abstract void mutate(Put put, AtomicLong endTime, CountDownLatch 
latch)
+      throws IOException;
+
+  protected abstract void mutateRow(RowMutations mutations) throws IOException;
+
+  @Test
+  public void testClientTracksServerPushback() throws Exception {
     HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
     Region region = rs.getRegions(tableName).get(0);
 
-    LOG.debug("Writing some data to "+tableName);
+    LOG.debug("Writing some data to " + tableName);
     // write some data
     Put p = new Put(Bytes.toBytes("row"));
     p.addColumn(family, qualifier, Bytes.toBytes("value1"));
-    mutator.mutate(p);
-    mutator.flush();
+    mutate(p);
 
     // get the current load on RS. Hopefully memstore isn't flushed since we 
wrote the the data
-    int load = (int) ((region.getMemStoreHeapSize() * 100)
-        / flushSizeBytes);
-    LOG.debug("Done writing some data to "+tableName);
+    int load = (int) ((region.getMemStoreHeapSize() * 100) / flushSizeBytes);
+    LOG.debug("Done writing some data to " + tableName);
 
     // get the stats for the region hosting our table
-    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
+    ClientBackoffPolicy backoffPolicy = getBackoffPolicy();
     assertTrue("Backoff policy is not correctly configured",
       backoffPolicy instanceof ExponentialClientBackoffPolicy);
 
-    ServerStatisticTracker stats = conn.getStatisticsTracker();
-    assertNotNull( "No stats configured for the client!", stats);
+    ServerStatisticTracker stats = getStatisticsTracker();
+    assertNotNull("No stats configured for the client!", stats);
     // get the names so we can query the stats
     ServerName server = rs.getServerName();
     byte[] regionName = region.getRegionInfo().getRegionName();
 
     // check to see we found some load on the memstore
-    ServerStatistics serverStats = stats.getServerStatsForTesting(server);
+    ServerStatistics serverStats = stats.getStats(server);
     ServerStatistics.RegionStatistics regionStats = 
serverStats.getStatsForRegion(regionName);
     assertEquals("We did not find some load on the memstore", load,
       regionStats.getMemStoreLoadPercent());
@@ -137,45 +132,29 @@ public class TestClientPushback {
     LOG.debug("Backoff calculated for " + 
region.getRegionInfo().getRegionNameAsString() + " @ " +
       server + " is " + backoffTime);
 
-    // Reach into the connection and submit work directly to AsyncProcess so 
we can
-    // monitor how long the submission was delayed via a callback
-    List<Row> ops = new ArrayList<>(1);
-    ops.add(p);
-    final CountDownLatch latch = new CountDownLatch(1);
-    final AtomicLong endTime = new AtomicLong();
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicLong endTime = new AtomicLong();
     long startTime = EnvironmentEdgeManager.currentTime();
-    Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> 
{
-        endTime.set(EnvironmentEdgeManager.currentTime());
-        latch.countDown();
-    };
-    AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
-            .setPool(mutator.getPool())
-            .setTableName(tableName)
-            .setRowAccess(ops)
-            .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
-            
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
-            .setRpcTimeout(60 * 1000)
-            .build();
-    mutator.getAsyncProcess().submit(task);
+    mutate(p, endTime, latch);
     // Currently the ExponentialClientBackoffPolicy under these test conditions
     // produces a backoffTime of 151 milliseconds. This is long enough so the
     // wait and related checks below are reasonable. Revisit if the backoff
     // time reported by above debug logging has significantly deviated.
+    MetricsConnection metrics = getConnectionMetrics();
     String name = server.getServerName() + "," + 
Bytes.toStringBinary(regionName);
-    MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
-            serverStats.get(server).get(regionName);
+    MetricsConnection.RegionStats rsStats = 
metrics.serverStats.get(server).get(regionName);
     assertEquals(name, rsStats.name);
     assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
-        (double)regionStats.getHeapOccupancyPercent(), 0.1 );
+      (double) regionStats.getHeapOccupancyPercent(), 0.1);
     assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
-        (double)regionStats.getMemStoreLoadPercent(), 0.1);
+      (double) regionStats.getMemStoreLoadPercent(), 0.1);
 
-    MetricsConnection.RunnerStats runnerStats = 
conn.getConnectionMetrics().runnerStats;
+    MetricsConnection.RunnerStats runnerStats = metrics.runnerStats;
 
     assertEquals(1, runnerStats.delayRunners.getCount());
     assertEquals(1, runnerStats.normalRunners.getCount());
-    assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(),
-      (double)backoffTime, 0.1);
+    assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(), 
(double) backoffTime,
+      0.1);
 
     latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
     assertNotEquals("AsyncProcess did not submit the work time", 0, 
endTime.get());
@@ -184,9 +163,6 @@ public class TestClientPushback {
 
   @Test
   public void testMutateRowStats() throws IOException {
-    Configuration conf = UTIL.getConfiguration();
-    ClusterConnection conn = (ClusterConnection) 
ConnectionFactory.createConnection(conf);
-    Table table = conn.getTable(tableName);
     HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
     Region region = rs.getRegions(tableName).get(0);
 
@@ -194,19 +170,19 @@ public class TestClientPushback {
     Put p = new Put(Bytes.toBytes("row"));
     p.addColumn(family, qualifier, Bytes.toBytes("value2"));
     mutations.add(p);
-    table.mutateRow(mutations);
+    mutateRow(mutations);
 
-    ServerStatisticTracker stats = conn.getStatisticsTracker();
-    assertNotNull( "No stats configured for the client!", stats);
+    ServerStatisticTracker stats = getStatisticsTracker();
+    assertNotNull("No stats configured for the client!", stats);
     // get the names so we can query the stats
     ServerName server = rs.getServerName();
     byte[] regionName = region.getRegionInfo().getRegionName();
 
     // check to see we found some load on the memstore
-    ServerStatistics serverStats = stats.getServerStatsForTesting(server);
+    ServerStatistics serverStats = stats.getStats(server);
     ServerStatistics.RegionStatistics regionStats = 
serverStats.getStatsForRegion(regionName);
 
     assertNotNull(regionStats);
     assertTrue(regionStats.getMemStoreLoadPercent() > 0);
-    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java
new file mode 100644
index 0000000..cc030d8
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPushback.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncClientPushback extends ClientPushbackTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncClientPushback.class);
+
+  private AsyncConnectionImpl conn;
+
+  private AsyncBufferedMutator mutator;
+
+  @Before
+  public void setUp() throws Exception {
+    conn =
+      (AsyncConnectionImpl) 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+    mutator = conn.getBufferedMutator(tableName);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(mutator, true);
+    Closeables.close(conn, true);
+  }
+
+  @Override
+  protected ClientBackoffPolicy getBackoffPolicy() throws IOException {
+    return conn.getBackoffPolicy();
+  }
+
+  @Override
+  protected ServerStatisticTracker getStatisticsTracker() throws IOException {
+    return conn.getStatisticsTracker().get();
+  }
+
+  @Override
+  protected MetricsConnection getConnectionMetrics() throws IOException {
+    return conn.getConnectionMetrics().get();
+  }
+
+  @Override
+  protected void mutate(Put put) throws IOException {
+    CompletableFuture<?> future = mutator.mutate(put);
+    mutator.flush();
+    future.join();
+  }
+
+  @Override
+  protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) 
throws IOException {
+    FutureUtils.addListener(mutator.mutate(put), (r, e) -> {
+      endTime.set(EnvironmentEdgeManager.currentTime());
+      latch.countDown();
+    });
+    mutator.flush();
+  }
+
+  @Override
+  protected void mutateRow(RowMutations mutations) throws IOException {
+    conn.getTable(tableName).mutateRow(mutations).join();
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
index ae959fa..e789349 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientPushback.java
@@ -17,196 +17,90 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static 
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
-import org.apache.hadoop.hbase.client.backoff.ExponentialClientBackoffPolicy;
-import org.apache.hadoop.hbase.client.backoff.ServerStatistics;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-/**
- * Test that we can actually send and use region metrics to slowdown client 
writes
- */
-@Category(MediumTests.class)
-public class TestClientPushback {
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestClientPushback extends ClientPushbackTestBase {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestClientPushback.class);
+    HBaseClassTestRule.forClass(TestClientPushback.class);
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(TestClientPushback.class);
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private ConnectionImplementation conn;
 
-  private static final TableName tableName = 
TableName.valueOf("client-pushback");
-  private static final byte[] family = Bytes.toBytes("f");
-  private static final byte[] qualifier = Bytes.toBytes("q");
-  private static final long flushSizeBytes = 512;
+  private BufferedMutatorImpl mutator;
 
-  @BeforeClass
-  public static void setupCluster() throws Exception{
-    Configuration conf = UTIL.getConfiguration();
-    // enable backpressure
-    conf.setBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, true);
-    // use the exponential backoff policy
-    conf.setClass(ClientBackoffPolicy.BACKOFF_POLICY_CLASS, 
ExponentialClientBackoffPolicy.class,
-      ClientBackoffPolicy.class);
-    // turn the memstore size way down so we don't need to write a lot to see 
changes in memstore
-    // load
-    conf.setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, flushSizeBytes);
-    // ensure we block the flushes when we are double that flushsize
-    conf.setLong(HConstants.HREGION_MEMSTORE_BLOCK_MULTIPLIER, 
HConstants.DEFAULT_HREGION_MEMSTORE_BLOCK_MULTIPLIER);
-    conf.setBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, true);
-    UTIL.startMiniCluster(1);
-    UTIL.createTable(tableName, family);
+  @Before
+  public void setUp() throws IOException {
+    conn = (ConnectionImplementation) 
ConnectionFactory.createConnection(UTIL.getConfiguration());
+    mutator = (BufferedMutatorImpl) conn.getBufferedMutator(tableName);
   }
 
-  @AfterClass
-  public static void teardownCluster() throws Exception{
-    UTIL.shutdownMiniCluster();
+  @After
+  public void tearDown() throws IOException {
+    Closeables.close(mutator, true);
+    Closeables.close(conn, true);
   }
 
-  @Test
-  public void testClientTracksServerPushback() throws Exception{
-    Configuration conf = UTIL.getConfiguration();
+  @Override
+  protected ClientBackoffPolicy getBackoffPolicy() throws IOException {
+    return conn.getBackoffPolicy();
+  }
 
-    ClusterConnection conn = (ClusterConnection) 
ConnectionFactory.createConnection(conf);
-    BufferedMutatorImpl mutator = (BufferedMutatorImpl) 
conn.getBufferedMutator(tableName);
+  @Override
+  protected ServerStatisticTracker getStatisticsTracker() throws IOException {
+    return conn.getStatisticsTracker();
+  }
 
-    HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
-    Region region = rs.getRegions(tableName).get(0);
+  @Override
+  protected MetricsConnection getConnectionMetrics() throws IOException {
+    return conn.getConnectionMetrics();
+  }
 
-    LOG.debug("Writing some data to "+tableName);
-    // write some data
-    Put p = new Put(Bytes.toBytes("row"));
-    p.addColumn(family, qualifier, Bytes.toBytes("value1"));
-    mutator.mutate(p);
+  @Override
+  protected void mutate(Put put) throws IOException {
+    mutator.mutate(put);
     mutator.flush();
+  }
 
-    // get the current load on RS. Hopefully memstore isn't flushed since we 
wrote the the data
-    int load = (int) ((region.getMemStoreHeapSize() * 100)
-        / flushSizeBytes);
-    LOG.debug("Done writing some data to "+tableName);
-
-    // get the stats for the region hosting our table
-    ClientBackoffPolicy backoffPolicy = conn.getBackoffPolicy();
-    assertTrue("Backoff policy is not correctly configured",
-      backoffPolicy instanceof ExponentialClientBackoffPolicy);
-
-    ServerStatisticTracker stats = conn.getStatisticsTracker();
-    assertNotNull( "No stats configured for the client!", stats);
-    // get the names so we can query the stats
-    ServerName server = rs.getServerName();
-    byte[] regionName = region.getRegionInfo().getRegionName();
-
-    // check to see we found some load on the memstore
-    ServerStatistics serverStats = stats.getServerStatsForTesting(server);
-    ServerStatistics.RegionStatistics regionStats = 
serverStats.getStatsForRegion(regionName);
-    assertEquals("We did not find some load on the memstore", load,
-      regionStats.getMemStoreLoadPercent());
-    // check that the load reported produces a nonzero delay
-    long backoffTime = backoffPolicy.getBackoffTime(server, regionName, 
serverStats);
-    assertNotEquals("Reported load does not produce a backoff", 0, 
backoffTime);
-    LOG.debug("Backoff calculated for " + 
region.getRegionInfo().getRegionNameAsString() + " @ " +
-      server + " is " + backoffTime);
-
+  @Override
+  protected void mutate(Put put, AtomicLong endTime, CountDownLatch latch) 
throws IOException {
     // Reach into the connection and submit work directly to AsyncProcess so 
we can
     // monitor how long the submission was delayed via a callback
     List<Row> ops = new ArrayList<>(1);
-    ops.add(p);
-    final CountDownLatch latch = new CountDownLatch(1);
-    final AtomicLong endTime = new AtomicLong();
-    long startTime = EnvironmentEdgeManager.currentTime();
+    ops.add(put);
     Batch.Callback<Result> callback = (byte[] r, byte[] row, Result result) -> 
{
-        endTime.set(EnvironmentEdgeManager.currentTime());
-        latch.countDown();
+      endTime.set(EnvironmentEdgeManager.currentTime());
+      latch.countDown();
     };
-    AsyncProcessTask<Result> task = AsyncProcessTask.newBuilder(callback)
-            .setPool(mutator.getPool())
-            .setTableName(tableName)
-            .setRowAccess(ops)
-            .setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
-            
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
-            .setRpcTimeout(60 * 1000)
-            .build();
+    AsyncProcessTask<Result> task =
+      
AsyncProcessTask.newBuilder(callback).setPool(mutator.getPool()).setTableName(tableName)
+        
.setRowAccess(ops).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
+        
.setOperationTimeout(conn.getConnectionConfiguration().getOperationTimeout())
+        .setRpcTimeout(60 * 1000).build();
     mutator.getAsyncProcess().submit(task);
-    // Currently the ExponentialClientBackoffPolicy under these test conditions
-    // produces a backoffTime of 151 milliseconds. This is long enough so the
-    // wait and related checks below are reasonable. Revisit if the backoff
-    // time reported by above debug logging has significantly deviated.
-    String name = server.getServerName() + "," + 
Bytes.toStringBinary(regionName);
-    MetricsConnection.RegionStats rsStats = conn.getConnectionMetrics().
-            serverStats.get(server).get(regionName);
-    assertEquals(name, rsStats.name);
-    assertEquals(rsStats.heapOccupancyHist.getSnapshot().getMean(),
-        (double)regionStats.getHeapOccupancyPercent(), 0.1 );
-    assertEquals(rsStats.memstoreLoadHist.getSnapshot().getMean(),
-        (double)regionStats.getMemStoreLoadPercent(), 0.1);
-
-    MetricsConnection.RunnerStats runnerStats = 
conn.getConnectionMetrics().runnerStats;
-
-    assertEquals(1, runnerStats.delayRunners.getCount());
-    assertEquals(1, runnerStats.normalRunners.getCount());
-    assertEquals("", runnerStats.delayIntevalHist.getSnapshot().getMean(),
-      (double)backoffTime, 0.1);
-
-    latch.await(backoffTime * 2, TimeUnit.MILLISECONDS);
-    assertNotEquals("AsyncProcess did not submit the work time", 0, 
endTime.get());
-    assertTrue("AsyncProcess did not delay long enough", endTime.get() - 
startTime >= backoffTime);
   }
 
-  @Test
-  public void testMutateRowStats() throws IOException {
-    Configuration conf = UTIL.getConfiguration();
-    ClusterConnection conn = (ClusterConnection) 
ConnectionFactory.createConnection(conf);
-    Table table = conn.getTable(tableName);
-    HRegionServer rs = UTIL.getHBaseCluster().getRegionServer(0);
-    Region region = rs.getRegions(tableName).get(0);
-
-    RowMutations mutations = new RowMutations(Bytes.toBytes("row"));
-    Put p = new Put(Bytes.toBytes("row"));
-    p.addColumn(family, qualifier, Bytes.toBytes("value2"));
-    mutations.add(p);
-    table.mutateRow(mutations);
-
-    ServerStatisticTracker stats = conn.getStatisticsTracker();
-    assertNotNull( "No stats configured for the client!", stats);
-    // get the names so we can query the stats
-    ServerName server = rs.getServerName();
-    byte[] regionName = region.getRegionInfo().getRegionName();
-
-    // check to see we found some load on the memstore
-    ServerStatistics serverStats = stats.getServerStatsForTesting(server);
-    ServerStatistics.RegionStatistics regionStats = 
serverStats.getStatsForRegion(regionName);
-
-    assertNotNull(regionStats);
-    assertTrue(regionStats.getMemStoreLoadPercent() > 0);
+  @Override
+  protected void mutateRow(RowMutations mutations) throws IOException {
+    try (Table table = conn.getTable(tableName)) {
+      table.mutateRow(mutations);
     }
+  }
 }

Reply via email to