This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new a3d2a2d  HBASE-22244 Make use of MetricsConnection in async client
a3d2a2d is described below

commit a3d2a2df3a0837f39d586f5f2018fd630883fa10
Author: zhangduo <[email protected]>
AuthorDate: Wed Apr 17 21:45:54 2019 +0800

    HBASE-22244 Make use of MetricsConnection in async client
---
 .../hadoop/hbase/client/AsyncClientScanner.java    |  7 +++---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   | 16 ++++++++++++-
 .../hbase/client/AsyncMetaRegionLocator.java       |  3 ++-
 .../hbase/client/AsyncNonMetaRegionLocator.java    | 26 +++++++++++++++++++++-
 .../hbase/client/AsyncRegionLocatorHelper.java     |  5 ++++-
 .../hbase/client/ConnectionImplementation.java     | 18 +++++++--------
 .../hadoop/hbase/client/ConnectionUtils.java       | 26 ++++++++++++++--------
 .../hadoop/hbase/client/MetricsConnection.java     | 25 ++++++++++++---------
 .../hadoop/hbase/client/RawAsyncTableImpl.java     |  2 +-
 .../hadoop/hbase/client/TestMetricsConnection.java | 12 ++++------
 10 files changed, 95 insertions(+), 45 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index 6d4aefd..d6cca48 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -197,10 +197,9 @@ class AsyncClientScanner {
   private void openScanner() {
     incRegionCountMetrics(scanMetrics);
     openScannerTries.set(1);
-    addListener(
-      timelineConsistentRead(conn.getLocator(), tableName, scan, 
scan.getStartRow(),
-        getLocateType(scan), this::openScanner, rpcTimeoutNs, 
getPrimaryTimeoutNs(), retryTimer),
-      (resp, error) -> {
+    addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, 
scan.getStartRow(),
+      getLocateType(scan), this::openScanner, rpcTimeoutNs, 
getPrimaryTimeoutNs(), retryTimer,
+      conn.getConnectionMetrics()), (resp, error) -> {
         if (error != null) {
           consumer.onError(error);
           return;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index c972d11..f046e7a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client;
 
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
+import static 
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
 import static 
org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
 import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -103,6 +105,8 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private volatile boolean closed = false;
 
+  private final Optional<MetricsConnection> metrics;
+
   public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry, 
String clusterId,
       User user) {
     this.conf = conf;
@@ -112,7 +116,12 @@ class AsyncConnectionImpl implements AsyncConnection {
     }
     this.connConf = new AsyncConnectionConfiguration(conf);
     this.registry = registry;
-    this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
+    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
+      this.metrics = Optional.of(new MetricsConnection(this.toString(), () -> 
null, () -> null));
+    } else {
+      this.metrics = Optional.empty();
+    }
+    this.rpcClient = RpcClientFactory.createClient(conf, clusterId, 
metrics.orElse(null));
     this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, 
true);
     this.rpcTimeout =
@@ -148,6 +157,7 @@ class AsyncConnectionImpl implements AsyncConnection {
     if (authService != null) {
       authService.shutdown();
     }
+    metrics.ifPresent(MetricsConnection::shutdown);
     closed = true;
   }
 
@@ -312,4 +322,8 @@ class AsyncConnectionImpl implements AsyncConnection {
   public void clearRegionLocationCache() {
     locator.clearCache();
   }
+
+  Optional<MetricsConnection> getConnectionMetrics() {
+    return metrics;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index fa08795..54bf9ff 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -23,6 +23,7 @@ import static 
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
 import static 
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
 import static 
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.hbase.HRegionLocation;
@@ -106,7 +107,7 @@ class AsyncMetaRegionLocator {
 
   void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
     AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, 
this::getCacheLocation,
-      this::addLocationToCache, this::removeLocationFromCache);
+      this::addLocationToCache, this::removeLocationFromCache, 
Optional.empty());
   }
 
   void clearCache() {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 7f25708..bbb84d0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -338,15 +338,25 @@ class AsyncNonMetaRegionLocator {
     return true;
   }
 
+  private void recordCacheHit() {
+    conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
+  }
+
+  private void recordCacheMiss() {
+    
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
+  }
+
   private RegionLocations locateRowInCache(TableCache tableCache, TableName 
tableName, byte[] row,
       int replicaId) {
     Map.Entry<byte[], RegionLocations> entry = 
tableCache.cache.floorEntry(row);
     if (entry == null) {
+      recordCacheMiss();
       return null;
     }
     RegionLocations locs = entry.getValue();
     HRegionLocation loc = locs.getRegionLocation(replicaId);
     if (loc == null) {
+      recordCacheMiss();
       return null;
     }
     byte[] endKey = loc.getRegion().getEndKey();
@@ -355,8 +365,10 @@ class AsyncNonMetaRegionLocator {
         LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
           Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
       }
+      recordCacheHit();
       return locs;
     } else {
+      recordCacheMiss();
       return null;
     }
   }
@@ -367,11 +379,13 @@ class AsyncNonMetaRegionLocator {
     Map.Entry<byte[], RegionLocations> entry =
       isEmptyStopRow ? tableCache.cache.lastEntry() : 
tableCache.cache.lowerEntry(row);
     if (entry == null) {
+      recordCacheMiss();
       return null;
     }
     RegionLocations locs = entry.getValue();
     HRegionLocation loc = locs.getRegionLocation(replicaId);
     if (loc == null) {
+      recordCacheMiss();
       return null;
     }
     if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
@@ -380,8 +394,10 @@ class AsyncNonMetaRegionLocator {
         LOG.trace("Found {} in cache for {}, row='{}', locateType={}, 
replicaId={}", loc, tableName,
           Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
       }
+      recordCacheHit();
       return locs;
     } else {
+      recordCacheMiss();
       return null;
     }
   }
@@ -529,6 +545,10 @@ class AsyncNonMetaRegionLocator {
     return getRegionLocationsInternal(tableName, row, replicaId, locateType, 
reload);
   }
 
+  private void recordClearRegionCache() {
+    
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
+  }
+
   private void removeLocationFromCache(HRegionLocation loc) {
     TableCache tableCache = cache.get(loc.getRegion().getTable());
     if (tableCache == null) {
@@ -544,10 +564,12 @@ class AsyncNonMetaRegionLocator {
       RegionLocations newLocs = removeRegionLocation(oldLocs, 
loc.getRegion().getReplicaId());
       if (newLocs == null) {
         if (tableCache.cache.remove(startKey, oldLocs)) {
+          recordClearRegionCache();
           return;
         }
       } else {
         if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
+          recordClearRegionCache();
           return;
         }
       }
@@ -569,7 +591,7 @@ class AsyncNonMetaRegionLocator {
 
   void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
     AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception, 
this::getCachedLocation,
-      this::addLocationToCache, this::removeLocationFromCache);
+      this::addLocationToCache, this::removeLocationFromCache, 
conn.getConnectionMetrics());
   }
 
   void clearCache(TableName tableName) {
@@ -583,6 +605,8 @@ class AsyncNonMetaRegionLocator {
         tableCache.allRequests.values().forEach(f -> 
f.completeExceptionally(error));
       }
     }
+    conn.getConnectionMetrics()
+      .ifPresent(metrics -> 
metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
   }
 
   void clearCache() {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index 4dde1bb..2836e4b 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -21,6 +21,7 @@ import static 
org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept
 import static 
org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
 
 import java.util.Arrays;
+import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import org.apache.commons.lang3.ObjectUtils;
@@ -51,7 +52,8 @@ final class AsyncRegionLocatorHelper {
 
   static void updateCachedLocationOnError(HRegionLocation loc, Throwable 
exception,
       Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
-      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> 
removeFromCache) {
+      Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation> 
removeFromCache,
+      Optional<MetricsConnection> metrics) {
     HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@@ -78,6 +80,7 @@ final class AsyncRegionLocatorHelper {
       addToCache.accept(newLoc);
     } else {
       LOG.debug("Try removing {} from cache", loc);
+      metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
       removeFromCache.accept(loc);
     }
   }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index f5b0b03..2954e04 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -190,10 +190,10 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
 
   // thread executor shared by all Table instances created
   // by this connection
-  private volatile ExecutorService batchPool = null;
+  private volatile ThreadPoolExecutor batchPool = null;
   // meta thread executor shared by all Table instances created
   // by this connection
-  private volatile ExecutorService metaLookupPool = null;
+  private volatile ThreadPoolExecutor metaLookupPool = null;
   private volatile boolean cleanupPool = false;
 
   private final Configuration conf;
@@ -238,14 +238,13 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
    * constructor
    * @param conf Configuration object
    */
-  ConnectionImplementation(Configuration conf,
-                           ExecutorService pool, User user) throws IOException 
{
+  ConnectionImplementation(Configuration conf, ExecutorService pool, User 
user) throws IOException {
     this.conf = conf;
     this.user = user;
     if (user != null && user.isLoginFromKeytab()) {
       spawnRenewalChore(user.getUGI());
     }
-    this.batchPool = pool;
+    this.batchPool = (ThreadPoolExecutor) pool;
     this.connectionConfig = new ConnectionConfiguration(conf);
     this.closed = false;
     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
@@ -286,7 +285,8 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
     this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory, 
rpcControllerFactory);
     if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
-      this.metrics = new MetricsConnection(this);
+      this.metrics =
+        new MetricsConnection(this.toString(), this::getBatchPool, 
this::getMetaLookupPool);
     } else {
       this.metrics = null;
     }
@@ -461,7 +461,7 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     return this.metrics;
   }
 
-  private ExecutorService getBatchPool() {
+  private ThreadPoolExecutor getBatchPool() {
     if (batchPool == null) {
       synchronized (this) {
         if (batchPool == null) {
@@ -474,7 +474,7 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     return this.batchPool;
   }
 
-  private ExecutorService getThreadPool(int maxThreads, int coreThreads, 
String nameHint,
+  private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads, 
String nameHint,
       BlockingQueue<Runnable> passedWorkQueue) {
     // shared HTable thread executor not yet initialized
     if (maxThreads == 0) {
@@ -503,7 +503,7 @@ class ConnectionImplementation implements 
ClusterConnection, Closeable {
     return tpe;
   }
 
-  private ExecutorService getMetaLookupPool() {
+  private ThreadPoolExecutor getMetaLookupPool() {
     if (this.metaLookupPool == null) {
       synchronized (this) {
         if (this.metaLookupPool == null) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 101dda0..6b06a7f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -28,6 +28,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadLocalRandom;
@@ -500,13 +501,19 @@ public final class ConnectionUtils {
   /**
    * Connect the two futures, if the src future is done, then mark the dst 
future as done. And if
    * the dst future is done, then cancel the src future. This is used for 
timeline consistent read.
+   * <p/>
+   * Pass empty metrics if you want to link the primary future and the dst 
future so we will not
+   * increase the hedge read related metrics.
    */
-  private static <T> void connect(CompletableFuture<T> srcFuture, 
CompletableFuture<T> dstFuture) {
+  private static <T> void connect(CompletableFuture<T> srcFuture, 
CompletableFuture<T> dstFuture,
+      Optional<MetricsConnection> metrics) {
     addListener(srcFuture, (r, e) -> {
       if (e != null) {
         dstFuture.completeExceptionally(e);
       } else {
-        dstFuture.complete(r);
+        if (dstFuture.complete(r)) {
+          metrics.ifPresent(MetricsConnection::incrHedgedReadWin);
+        }
       }
     });
     // The cancellation may be a dummy one as the dstFuture may be completed 
by this srcFuture.
@@ -519,7 +526,7 @@ public final class ConnectionUtils {
 
   private static <T> void sendRequestsToSecondaryReplicas(
       Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations 
locs,
-      CompletableFuture<T> future) {
+      CompletableFuture<T> future, Optional<MetricsConnection> metrics) {
     if (future.isDone()) {
       // do not send requests to secondary replicas if the future is done, 
i.e, the primary request
       // has already been finished.
@@ -527,14 +534,15 @@ public final class ConnectionUtils {
     }
     for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
       CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
-      connect(secondaryFuture, future);
+      metrics.ifPresent(MetricsConnection::incrHedgedReadOps);
+      connect(secondaryFuture, future, metrics);
     }
   }
 
   static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator 
locator,
       TableName tableName, Query query, byte[] row, RegionLocateType 
locateType,
       Function<Integer, CompletableFuture<T>> requestReplica, long 
rpcTimeoutNs,
-      long primaryCallTimeoutNs, Timer retryTimer) {
+      long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection> 
metrics) {
     if (query.getConsistency() == Consistency.STRONG) {
       return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
     }
@@ -545,7 +553,7 @@ public final class ConnectionUtils {
     // Timeline consistent read, where we may send requests to other region 
replicas
     CompletableFuture<T> primaryFuture = 
requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
     CompletableFuture<T> future = new CompletableFuture<>();
-    connect(primaryFuture, future);
+    connect(primaryFuture, future, Optional.empty());
     long startNs = System.nanoTime();
     // after the getRegionLocations, all the locations for the replicas of 
this region should have
     // been cached, so it is not big deal to locate them again when actually 
sending requests to
@@ -567,11 +575,11 @@ public final class ConnectionUtils {
         }
         long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
         if (delayNs <= 0) {
-          sendRequestsToSecondaryReplicas(requestReplica, locs, future);
+          sendRequestsToSecondaryReplicas(requestReplica, locs, future, 
metrics);
         } else {
           retryTimer.newTimeout(
-            timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, 
future), delayNs,
-            TimeUnit.NANOSECONDS);
+            timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, 
future, metrics),
+            delayNs, TimeUnit.NANOSECONDS);
         }
       });
     return future;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index ac1a02a..c62a712 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-
+import java.util.function.Supplier;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 import 
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -305,30 +305,30 @@ public class MetricsConnection implements 
StatisticTrackable {
   private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
     new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
 
-  MetricsConnection(final ConnectionImplementation conn) {
-    this.scope = conn.toString();
+  MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
+      Supplier<ThreadPoolExecutor> metaPool) {
+    this.scope = scope;
     this.registry = new MetricRegistry();
-
     this.registry.register(getExecutorPoolName(),
         new RatioGauge() {
           @Override
           protected Ratio getRatio() {
-            ThreadPoolExecutor batchPool = (ThreadPoolExecutor) 
conn.getCurrentBatchPool();
-            if (batchPool == null) {
+            ThreadPoolExecutor pool = batchPool.get();
+            if (pool == null) {
               return Ratio.of(0, 0);
             }
-            return Ratio.of(batchPool.getActiveCount(), 
batchPool.getMaximumPoolSize());
+            return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
           }
         });
     this.registry.register(getMetaPoolName(),
         new RatioGauge() {
           @Override
           protected Ratio getRatio() {
-            ThreadPoolExecutor metaPool = (ThreadPoolExecutor) 
conn.getCurrentMetaLookupPool();
-            if (metaPool == null) {
+            ThreadPoolExecutor pool = metaPool.get();
+            if (pool == null) {
               return Ratio.of(0, 0);
             }
-            return Ratio.of(metaPool.getActiveCount(), 
metaPool.getMaximumPoolSize());
+            return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
           }
         });
     this.metaCacheHits = registry.counter(name(this.getClass(), 
"metaCacheHits", scope));
@@ -401,6 +401,11 @@ public class MetricsConnection implements 
StatisticTrackable {
     metaCacheNumClearRegion.inc();
   }
 
+  /** Increment the number of meta cache drops requested for individual 
region. */
+  public void incrMetaCacheNumClearRegion(int count) {
+    metaCacheNumClearRegion.inc(count);
+  }
+
   /** Increment the number of hedged read that have occurred. */
   public void incrHedgedReadOps() {
     hedgedReadOps.inc();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 86f11b9..688c86f 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -232,7 +232,7 @@ class RawAsyncTableImpl implements 
AsyncTable<AdvancedScanResultConsumer> {
   public CompletableFuture<Result> get(Get get) {
     return timelineConsistentRead(conn.getLocator(), tableName, get, 
get.getRow(),
       RegionLocateType.CURRENT, replicaId -> get(get, replicaId), 
readRpcTimeoutNs,
-      conn.connConf.getPrimaryCallTimeoutNs(), retryTimer);
+      conn.connConf.getPrimaryCallTimeoutNs(), retryTimer, 
conn.getConnectionMetrics());
   }
 
   @Override
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
index 97a672d..bfbaf97 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
@@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals;
 import com.codahale.metrics.RatioGauge;
 import com.codahale.metrics.RatioGauge.Ratio;
 import java.io.IOException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MetricsTests;
@@ -35,7 +34,6 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
 
 import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
 
@@ -57,13 +55,11 @@ public class TestMetricsConnection {
       HBaseClassTestRule.forClass(TestMetricsConnection.class);
 
   private static MetricsConnection METRICS;
-  private static final ExecutorService BATCH_POOL = 
Executors.newFixedThreadPool(2);
+  private static final ThreadPoolExecutor BATCH_POOL =
+    (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
   @BeforeClass
   public static void beforeClass() {
-    ConnectionImplementation mocked = 
Mockito.mock(ConnectionImplementation.class);
-    Mockito.when(mocked.toString()).thenReturn("mocked-connection");
-    Mockito.when(mocked.getCurrentBatchPool()).thenReturn(BATCH_POOL);
-    METRICS = new MetricsConnection(mocked);
+    METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, () 
-> null);
   }
 
   @AfterClass

Reply via email to