This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new a3d2a2d HBASE-22244 Make use of MetricsConnection in async client
a3d2a2d is described below
commit a3d2a2df3a0837f39d586f5f2018fd630883fa10
Author: zhangduo <[email protected]>
AuthorDate: Wed Apr 17 21:45:54 2019 +0800
HBASE-22244 Make use of MetricsConnection in async client
---
.../hadoop/hbase/client/AsyncClientScanner.java | 7 +++---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 16 ++++++++++++-
.../hbase/client/AsyncMetaRegionLocator.java | 3 ++-
.../hbase/client/AsyncNonMetaRegionLocator.java | 26 +++++++++++++++++++++-
.../hbase/client/AsyncRegionLocatorHelper.java | 5 ++++-
.../hbase/client/ConnectionImplementation.java | 18 +++++++--------
.../hadoop/hbase/client/ConnectionUtils.java | 26 ++++++++++++++--------
.../hadoop/hbase/client/MetricsConnection.java | 25 ++++++++++++---------
.../hadoop/hbase/client/RawAsyncTableImpl.java | 2 +-
.../hadoop/hbase/client/TestMetricsConnection.java | 12 ++++------
10 files changed, 95 insertions(+), 45 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index 6d4aefd..d6cca48 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -197,10 +197,9 @@ class AsyncClientScanner {
private void openScanner() {
incRegionCountMetrics(scanMetrics);
openScannerTries.set(1);
- addListener(
- timelineConsistentRead(conn.getLocator(), tableName, scan,
scan.getStartRow(),
- getLocateType(scan), this::openScanner, rpcTimeoutNs,
getPrimaryTimeoutNs(), retryTimer),
- (resp, error) -> {
+ addListener(timelineConsistentRead(conn.getLocator(), tableName, scan,
scan.getStartRow(),
+ getLocateType(scan), this::openScanner, rpcTimeoutNs,
getPrimaryTimeoutNs(), retryTimer,
+ conn.getConnectionMetrics()), (resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index c972d11..f046e7a 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -19,10 +19,12 @@ package org.apache.hadoop.hbase.client;
import static
org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
+import static
org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
import static
org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -103,6 +105,8 @@ class AsyncConnectionImpl implements AsyncConnection {
private volatile boolean closed = false;
+ private final Optional<MetricsConnection> metrics;
+
public AsyncConnectionImpl(Configuration conf, AsyncRegistry registry,
String clusterId,
User user) {
this.conf = conf;
@@ -112,7 +116,12 @@ class AsyncConnectionImpl implements AsyncConnection {
}
this.connConf = new AsyncConnectionConfiguration(conf);
this.registry = registry;
- this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
+ if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
+ this.metrics = Optional.of(new MetricsConnection(this.toString(), () ->
null, () -> null));
+ } else {
+ this.metrics = Optional.empty();
+ }
+ this.rpcClient = RpcClientFactory.createClient(conf, clusterId,
metrics.orElse(null));
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY,
true);
this.rpcTimeout =
@@ -148,6 +157,7 @@ class AsyncConnectionImpl implements AsyncConnection {
if (authService != null) {
authService.shutdown();
}
+ metrics.ifPresent(MetricsConnection::shutdown);
closed = true;
}
@@ -312,4 +322,8 @@ class AsyncConnectionImpl implements AsyncConnection {
public void clearRegionLocationCache() {
locator.clearCache();
}
+
+ Optional<MetricsConnection> getConnectionMetrics() {
+ return metrics;
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
index fa08795..54bf9ff 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMetaRegionLocator.java
@@ -23,6 +23,7 @@ import static
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.isGood;
import static
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.removeRegionLocation;
import static
org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.replaceRegionLocation;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -106,7 +107,7 @@ class AsyncMetaRegionLocator {
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception,
this::getCacheLocation,
- this::addLocationToCache, this::removeLocationFromCache);
+ this::addLocationToCache, this::removeLocationFromCache,
Optional.empty());
}
void clearCache() {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
index 7f25708..bbb84d0 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java
@@ -338,15 +338,25 @@ class AsyncNonMetaRegionLocator {
return true;
}
+ private void recordCacheHit() {
+ conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheHit);
+ }
+
+ private void recordCacheMiss() {
+
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheMiss);
+ }
+
private RegionLocations locateRowInCache(TableCache tableCache, TableName
tableName, byte[] row,
int replicaId) {
Map.Entry<byte[], RegionLocations> entry =
tableCache.cache.floorEntry(row);
if (entry == null) {
+ recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
+ recordCacheMiss();
return null;
}
byte[] endKey = loc.getRegion().getEndKey();
@@ -355,8 +365,10 @@ class AsyncNonMetaRegionLocator {
LOG.trace("Found {} in cache for {}, row='{}', locateType={},
replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.CURRENT, replicaId);
}
+ recordCacheHit();
return locs;
} else {
+ recordCacheMiss();
return null;
}
}
@@ -367,11 +379,13 @@ class AsyncNonMetaRegionLocator {
Map.Entry<byte[], RegionLocations> entry =
isEmptyStopRow ? tableCache.cache.lastEntry() :
tableCache.cache.lowerEntry(row);
if (entry == null) {
+ recordCacheMiss();
return null;
}
RegionLocations locs = entry.getValue();
HRegionLocation loc = locs.getRegionLocation(replicaId);
if (loc == null) {
+ recordCacheMiss();
return null;
}
if (isEmptyStopRow(loc.getRegion().getEndKey()) ||
@@ -380,8 +394,10 @@ class AsyncNonMetaRegionLocator {
LOG.trace("Found {} in cache for {}, row='{}', locateType={},
replicaId={}", loc, tableName,
Bytes.toStringBinary(row), RegionLocateType.BEFORE, replicaId);
}
+ recordCacheHit();
return locs;
} else {
+ recordCacheMiss();
return null;
}
}
@@ -529,6 +545,10 @@ class AsyncNonMetaRegionLocator {
return getRegionLocationsInternal(tableName, row, replicaId, locateType,
reload);
}
+ private void recordClearRegionCache() {
+
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearRegion);
+ }
+
private void removeLocationFromCache(HRegionLocation loc) {
TableCache tableCache = cache.get(loc.getRegion().getTable());
if (tableCache == null) {
@@ -544,10 +564,12 @@ class AsyncNonMetaRegionLocator {
RegionLocations newLocs = removeRegionLocation(oldLocs,
loc.getRegion().getReplicaId());
if (newLocs == null) {
if (tableCache.cache.remove(startKey, oldLocs)) {
+ recordClearRegionCache();
return;
}
} else {
if (tableCache.cache.replace(startKey, oldLocs, newLocs)) {
+ recordClearRegionCache();
return;
}
}
@@ -569,7 +591,7 @@ class AsyncNonMetaRegionLocator {
void updateCachedLocationOnError(HRegionLocation loc, Throwable exception) {
AsyncRegionLocatorHelper.updateCachedLocationOnError(loc, exception,
this::getCachedLocation,
- this::addLocationToCache, this::removeLocationFromCache);
+ this::addLocationToCache, this::removeLocationFromCache,
conn.getConnectionMetrics());
}
void clearCache(TableName tableName) {
@@ -583,6 +605,8 @@ class AsyncNonMetaRegionLocator {
tableCache.allRequests.values().forEach(f ->
f.completeExceptionally(error));
}
}
+ conn.getConnectionMetrics()
+ .ifPresent(metrics ->
metrics.incrMetaCacheNumClearRegion(tableCache.cache.size()));
}
void clearCache() {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
index 4dde1bb..2836e4b 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocatorHelper.java
@@ -21,6 +21,7 @@ import static
org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.findExcept
import static
org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil.isMetaClearingException;
import java.util.Arrays;
+import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.lang3.ObjectUtils;
@@ -51,7 +52,8 @@ final class AsyncRegionLocatorHelper {
static void updateCachedLocationOnError(HRegionLocation loc, Throwable
exception,
Function<HRegionLocation, HRegionLocation> cachedLocationSupplier,
- Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation>
removeFromCache) {
+ Consumer<HRegionLocation> addToCache, Consumer<HRegionLocation>
removeFromCache,
+ Optional<MetricsConnection> metrics) {
HRegionLocation oldLoc = cachedLocationSupplier.apply(loc);
if (LOG.isDebugEnabled()) {
LOG.debug("Try updating {} , the old value is {}, error={}", loc, oldLoc,
@@ -78,6 +80,7 @@ final class AsyncRegionLocatorHelper {
addToCache.accept(newLoc);
} else {
LOG.debug("Try removing {} from cache", loc);
+ metrics.ifPresent(m -> m.incrCacheDroppingExceptions(exception));
removeFromCache.accept(loc);
}
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index f5b0b03..2954e04 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -190,10 +190,10 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
// thread executor shared by all Table instances created
// by this connection
- private volatile ExecutorService batchPool = null;
+ private volatile ThreadPoolExecutor batchPool = null;
// meta thread executor shared by all Table instances created
// by this connection
- private volatile ExecutorService metaLookupPool = null;
+ private volatile ThreadPoolExecutor metaLookupPool = null;
private volatile boolean cleanupPool = false;
private final Configuration conf;
@@ -238,14 +238,13 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
* constructor
* @param conf Configuration object
*/
- ConnectionImplementation(Configuration conf,
- ExecutorService pool, User user) throws IOException
{
+ ConnectionImplementation(Configuration conf, ExecutorService pool, User
user) throws IOException {
this.conf = conf;
this.user = user;
if (user != null && user.isLoginFromKeytab()) {
spawnRenewalChore(user.getUGI());
}
- this.batchPool = pool;
+ this.batchPool = (ThreadPoolExecutor) pool;
this.connectionConfig = new ConnectionConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
@@ -286,7 +285,8 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = new AsyncProcess(this, conf, rpcCallerFactory,
rpcControllerFactory);
if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
- this.metrics = new MetricsConnection(this);
+ this.metrics =
+ new MetricsConnection(this.toString(), this::getBatchPool,
this::getMetaLookupPool);
} else {
this.metrics = null;
}
@@ -461,7 +461,7 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
return this.metrics;
}
- private ExecutorService getBatchPool() {
+ private ThreadPoolExecutor getBatchPool() {
if (batchPool == null) {
synchronized (this) {
if (batchPool == null) {
@@ -474,7 +474,7 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
return this.batchPool;
}
- private ExecutorService getThreadPool(int maxThreads, int coreThreads,
String nameHint,
+ private ThreadPoolExecutor getThreadPool(int maxThreads, int coreThreads,
String nameHint,
BlockingQueue<Runnable> passedWorkQueue) {
// shared HTable thread executor not yet initialized
if (maxThreads == 0) {
@@ -503,7 +503,7 @@ class ConnectionImplementation implements
ClusterConnection, Closeable {
return tpe;
}
- private ExecutorService getMetaLookupPool() {
+ private ThreadPoolExecutor getMetaLookupPool() {
if (this.metaLookupPool == null) {
synchronized (this) {
if (this.metaLookupPool == null) {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 101dda0..6b06a7f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -28,6 +28,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
@@ -500,13 +501,19 @@ public final class ConnectionUtils {
/**
* Connect the two futures, if the src future is done, then mark the dst
future as done. And if
* the dst future is done, then cancel the src future. This is used for
timeline consistent read.
+ * <p/>
+ * Pass empty metrics if you want to link the primary future and the dst
future so we will not
+ * increase the hedge read related metrics.
*/
- private static <T> void connect(CompletableFuture<T> srcFuture,
CompletableFuture<T> dstFuture) {
+ private static <T> void connect(CompletableFuture<T> srcFuture,
CompletableFuture<T> dstFuture,
+ Optional<MetricsConnection> metrics) {
addListener(srcFuture, (r, e) -> {
if (e != null) {
dstFuture.completeExceptionally(e);
} else {
- dstFuture.complete(r);
+ if (dstFuture.complete(r)) {
+ metrics.ifPresent(MetricsConnection::incrHedgedReadWin);
+ }
}
});
// The cancellation may be a dummy one as the dstFuture may be completed
by this srcFuture.
@@ -519,7 +526,7 @@ public final class ConnectionUtils {
private static <T> void sendRequestsToSecondaryReplicas(
Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations
locs,
- CompletableFuture<T> future) {
+ CompletableFuture<T> future, Optional<MetricsConnection> metrics) {
if (future.isDone()) {
// do not send requests to secondary replicas if the future is done,
i.e, the primary request
// has already been finished.
@@ -527,14 +534,15 @@ public final class ConnectionUtils {
}
for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
- connect(secondaryFuture, future);
+ metrics.ifPresent(MetricsConnection::incrHedgedReadOps);
+ connect(secondaryFuture, future, metrics);
}
}
static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator
locator,
TableName tableName, Query query, byte[] row, RegionLocateType
locateType,
Function<Integer, CompletableFuture<T>> requestReplica, long
rpcTimeoutNs,
- long primaryCallTimeoutNs, Timer retryTimer) {
+ long primaryCallTimeoutNs, Timer retryTimer, Optional<MetricsConnection>
metrics) {
if (query.getConsistency() == Consistency.STRONG) {
return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
}
@@ -545,7 +553,7 @@ public final class ConnectionUtils {
// Timeline consistent read, where we may send requests to other region
replicas
CompletableFuture<T> primaryFuture =
requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
CompletableFuture<T> future = new CompletableFuture<>();
- connect(primaryFuture, future);
+ connect(primaryFuture, future, Optional.empty());
long startNs = System.nanoTime();
// after the getRegionLocations, all the locations for the replicas of
this region should have
// been cached, so it is not big deal to locate them again when actually
sending requests to
@@ -567,11 +575,11 @@ public final class ConnectionUtils {
}
long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
if (delayNs <= 0) {
- sendRequestsToSecondaryReplicas(requestReplica, locs, future);
+ sendRequestsToSecondaryReplicas(requestReplica, locs, future,
metrics);
} else {
retryTimer.newTimeout(
- timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs,
future), delayNs,
- TimeUnit.NANOSECONDS);
+ timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs,
future, metrics),
+ delayNs, TimeUnit.NANOSECONDS);
}
});
return future;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index ac1a02a..c62a712 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -33,7 +33,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
+import java.util.function.Supplier;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
@@ -305,30 +305,30 @@ public class MetricsConnection implements
StatisticTrackable {
private final ConcurrentMap<String, Counter> cacheDroppingExceptions =
new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
- MetricsConnection(final ConnectionImplementation conn) {
- this.scope = conn.toString();
+ MetricsConnection(String scope, Supplier<ThreadPoolExecutor> batchPool,
+ Supplier<ThreadPoolExecutor> metaPool) {
+ this.scope = scope;
this.registry = new MetricRegistry();
-
this.registry.register(getExecutorPoolName(),
new RatioGauge() {
@Override
protected Ratio getRatio() {
- ThreadPoolExecutor batchPool = (ThreadPoolExecutor)
conn.getCurrentBatchPool();
- if (batchPool == null) {
+ ThreadPoolExecutor pool = batchPool.get();
+ if (pool == null) {
return Ratio.of(0, 0);
}
- return Ratio.of(batchPool.getActiveCount(),
batchPool.getMaximumPoolSize());
+ return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
}
});
this.registry.register(getMetaPoolName(),
new RatioGauge() {
@Override
protected Ratio getRatio() {
- ThreadPoolExecutor metaPool = (ThreadPoolExecutor)
conn.getCurrentMetaLookupPool();
- if (metaPool == null) {
+ ThreadPoolExecutor pool = metaPool.get();
+ if (pool == null) {
return Ratio.of(0, 0);
}
- return Ratio.of(metaPool.getActiveCount(),
metaPool.getMaximumPoolSize());
+ return Ratio.of(pool.getActiveCount(), pool.getMaximumPoolSize());
}
});
this.metaCacheHits = registry.counter(name(this.getClass(),
"metaCacheHits", scope));
@@ -401,6 +401,11 @@ public class MetricsConnection implements
StatisticTrackable {
metaCacheNumClearRegion.inc();
}
+ /** Increment the number of meta cache drops requested for individual
region. */
+ public void incrMetaCacheNumClearRegion(int count) {
+ metaCacheNumClearRegion.inc(count);
+ }
+
/** Increment the number of hedged read that have occurred. */
public void incrHedgedReadOps() {
hedgedReadOps.inc();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 86f11b9..688c86f 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -232,7 +232,7 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
public CompletableFuture<Result> get(Get get) {
return timelineConsistentRead(conn.getLocator(), tableName, get,
get.getRow(),
RegionLocateType.CURRENT, replicaId -> get(get, replicaId),
readRpcTimeoutNs,
- conn.connConf.getPrimaryCallTimeoutNs(), retryTimer);
+ conn.connConf.getPrimaryCallTimeoutNs(), retryTimer,
conn.getConnectionMetrics());
}
@Override
diff --git
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
index 97a672d..bfbaf97 100644
---
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
+++
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
@@ -22,9 +22,8 @@ import static org.junit.Assert.assertEquals;
import com.codahale.metrics.RatioGauge;
import com.codahale.metrics.RatioGauge.Ratio;
import java.io.IOException;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MetricsTests;
@@ -35,7 +34,6 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
@@ -57,13 +55,11 @@ public class TestMetricsConnection {
HBaseClassTestRule.forClass(TestMetricsConnection.class);
private static MetricsConnection METRICS;
- private static final ExecutorService BATCH_POOL =
Executors.newFixedThreadPool(2);
+ private static final ThreadPoolExecutor BATCH_POOL =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2);
@BeforeClass
public static void beforeClass() {
- ConnectionImplementation mocked =
Mockito.mock(ConnectionImplementation.class);
- Mockito.when(mocked.toString()).thenReturn("mocked-connection");
- Mockito.when(mocked.getCurrentBatchPool()).thenReturn(BATCH_POOL);
- METRICS = new MetricsConnection(mocked);
+ METRICS = new MetricsConnection("mocked-connection", () -> BATCH_POOL, ()
-> null);
}
@AfterClass