This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 4fde3b2ed0d HBASE-29233: Capture scan metrics at region level (#7132)
(#6868)
4fde3b2ed0d is described below
commit 4fde3b2ed0db0145276cb73da59dc91418abe24a
Author: sanjeet006py <[email protected]>
AuthorDate: Thu Jul 3 00:40:40 2025 +0530
HBASE-29233: Capture scan metrics at region level (#7132) (#6868)
Signed-off-by: Viraj Jasani <[email protected]>
---
.gitignore | 1 +
.../hadoop/hbase/client/AbstractClientScanner.java | 12 +
.../hadoop/hbase/client/AsyncClientScanner.java | 15 +
.../apache/hadoop/hbase/client/ClientScanner.java | 22 +-
.../hadoop/hbase/client/ConnectionUtils.java | 14 +-
.../apache/hadoop/hbase/client/ImmutableScan.java | 11 +
.../java/org/apache/hadoop/hbase/client/Scan.java | 26 +-
.../hadoop/hbase/client/ScannerCallable.java | 2 +-
.../hbase/client/ScannerCallableWithReplicas.java | 5 +
.../client/metrics/RegionScanMetricsData.java | 80 +++
.../hadoop/hbase/client/metrics/ScanMetrics.java | 16 +-
.../client/metrics/ScanMetricsRegionInfo.java | 82 +++
.../hbase/client/metrics/ScanMetricsUtil.java | 88 +++
.../client/metrics/ServerSideScanMetrics.java | 121 +++-
.../hbase/client/ClientSideRegionScanner.java | 11 +-
.../hadoop/hbase/client/TableSnapshotScanner.java | 3 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 9 +-
.../hbase/regionserver/RegionScannerImpl.java | 7 +-
.../hbase/client/TestAsyncTableScanMetrics.java | 111 +++-
...AsyncTableScanMetricsWithScannerSuspending.java | 162 +++++
.../hbase/client/TestClientSideRegionScanner.java | 85 +++
.../hadoop/hbase/client/TestReplicasClient.java | 80 +++
.../hadoop/hbase/client/TestScanAttributes.java | 50 ++
.../hadoop/hbase/client/TestTableScanMetrics.java | 710 +++++++++++++++++++++
.../hbase/client/TestTableSnapshotScanner.java | 84 +++
25 files changed, 1750 insertions(+), 57 deletions(-)
diff --git a/.gitignore b/.gitignore
index 56ed1be0b81..666771f7e25 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,4 @@ linklint/
.java-version
tmp
**/.flattened-pom.xml
+.vscode/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 48cec12f43c..ece657deb0a 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public abstract class AbstractClientScanner implements ResultScanner {
protected ScanMetrics scanMetrics;
+ private boolean isScanMetricsByRegionEnabled = false;
/**
* Check and initialize if application wants to collect scan metrics
@@ -34,6 +35,9 @@ public abstract class AbstractClientScanner implements
ResultScanner {
// check if application wants to collect scan metrics
if (scan.isScanMetricsEnabled()) {
scanMetrics = new ScanMetrics();
+ if (scan.isScanMetricsByRegionEnabled()) {
+ isScanMetricsByRegionEnabled = true;
+ }
}
}
@@ -46,4 +50,12 @@ public abstract class AbstractClientScanner implements
ResultScanner {
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
+
+ protected void setIsScanMetricsByRegionEnabled(boolean
isScanMetricsByRegionEnabled) {
+ this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
+ }
+
+ protected boolean isScanMetricsByRegionEnabled() {
+ return isScanMetricsByRegionEnabled;
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index b61f5b80c9e..d58c8e60c8d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -95,6 +95,8 @@ class AsyncClientScanner {
private final Map<String, byte[]> requestAttributes;
+ private final boolean isScanMetricsByRegionEnabled;
+
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer,
TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long
pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int
startLogErrorsCnt,
@@ -118,12 +120,17 @@ class AsyncClientScanner {
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = createScanResultCache(scan);
this.requestAttributes = requestAttributes;
+ boolean isScanMetricsByRegionEnabled = false;
if (scan.isScanMetricsEnabled()) {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
+ if (this.scan.isScanMetricsByRegionEnabled()) {
+ isScanMetricsByRegionEnabled = true;
+ }
} else {
this.scanMetrics = null;
}
+ this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
/*
* Assumes that the `start()` method is called immediately after
construction. If this is no
@@ -250,6 +257,9 @@ class AsyncClientScanner {
}
private void openScanner() {
+ if (this.isScanMetricsByRegionEnabled) {
+ scanMetrics.moveToNextRegion();
+ }
incRegionCountMetrics(scanMetrics);
openScannerTries.set(1);
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan,
scan.getStartRow(),
@@ -265,6 +275,11 @@ class AsyncClientScanner {
span.end();
}
}
+ if (this.isScanMetricsByRegionEnabled) {
+ HRegionLocation loc = resp.loc;
+
this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
+ loc.getServerName());
+ }
startScan(resp);
}
});
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index df7f900830e..a453945e729 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -34,10 +34,12 @@ import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -135,6 +137,11 @@ public abstract class ClientScanner extends
AbstractClientScanner {
this.useScannerTimeoutForNextCalls =
connectionConfiguration.isUseScannerTimeoutForNextCalls();
this.requestAttributes = requestAttributes;
+ if (scan.isScanMetricsByRegionEnabled() && scan.getConsistency() ==
Consistency.TIMELINE) {
+ scan.setEnableScanMetricsByRegion(false);
+ scanForMetrics.setEnableScanMetricsByRegion(false);
+ LOG.warn("Scan metrics by region is not supported for timeline
consistency in HBase 2");
+ }
// check if application wants to collect scan metrics
initScanMetrics(scan);
@@ -259,6 +266,9 @@ public abstract class ClientScanner extends
AbstractClientScanner {
// clear the current region, we will set a new value to it after the first
call of the new
// callable.
this.currentRegion = null;
+ if (isScanMetricsByRegionEnabled()) {
+ scanMetrics.moveToNextRegion();
+ }
this.callable = new ScannerCallableWithReplicas(getTable(),
getConnection(),
createScannerCallable(), pool, primaryOperationTimeout, scan,
getRetries(), readRpcTimeout,
scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
@@ -281,6 +291,7 @@ public abstract class ClientScanner extends
AbstractClientScanner {
Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout);
if (currentRegion == null && updateCurrentRegion) {
currentRegion = callable.getHRegionInfo();
+ initScanMetricsRegionInfo();
}
return rrs;
}
@@ -469,7 +480,8 @@ public abstract class ClientScanner extends
AbstractClientScanner {
}
long currentTime = EnvironmentEdgeManager.currentTime();
if (this.scanMetrics != null) {
- this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime -
lastNext);
+
this.scanMetrics.addToCounter(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME,
+ currentTime - lastNext);
}
lastNext = currentTime;
// Groom the array of Results that we received back from the server
before adding that
@@ -622,4 +634,12 @@ public abstract class ClientScanner extends
AbstractClientScanner {
return nextWithSyncCache();
}
}
+
+ private void initScanMetricsRegionInfo() {
+ if (isScanMetricsByRegionEnabled()) {
+ HRegionLocation location = callable.getLocation();
+ String encodedRegionName = location.getRegion().getEncodedName();
+ scanMetrics.initScanMetricsRegionInfo(encodedRegionName,
location.getServerName());
+ }
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index c930ea3ab3e..2beb107f2a8 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -367,9 +367,9 @@ public final class ConnectionUtils {
if (scanMetrics == null) {
return;
}
- scanMetrics.countOfRPCcalls.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.RPC_CALLS_METRIC_NAME, 1);
if (isRegionServerRemote) {
- scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME, 1);
}
}
@@ -377,9 +377,9 @@ public final class ConnectionUtils {
if (scanMetrics == null) {
return;
}
- scanMetrics.countOfRPCRetries.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.RPC_RETRIES_METRIC_NAME, 1);
if (isRegionServerRemote) {
- scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_RETRIES_METRIC_NAME, 1);
}
}
@@ -394,9 +394,9 @@ public final class ConnectionUtils {
resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
}
}
- scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+ scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME,
resultSize);
if (isRegionServerRemote) {
- scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
+
scanMetrics.addToCounter(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME,
resultSize);
}
}
@@ -416,7 +416,7 @@ public final class ConnectionUtils {
if (scanMetrics == null) {
return;
}
- scanMetrics.countOfRegions.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
}
/**
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
index b54541e60f4..ace1f2bf4d0 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
@@ -240,6 +240,12 @@ public final class ImmutableScan extends Scan {
"ImmutableScan does not allow access to setScanMetricsEnabled");
}
+ @Override
+ public Scan setEnableScanMetricsByRegion(final boolean enable) {
+ throw new UnsupportedOperationException(
+ "ImmutableScan does not allow access to setEnableScanMetricsByRegion");
+ }
+
@Override
@Deprecated
public Scan setAsyncPrefetch(boolean asyncPrefetch) {
@@ -420,6 +426,11 @@ public final class ImmutableScan extends Scan {
return this.delegateScan.isScanMetricsEnabled();
}
+ @Override
+ public boolean isScanMetricsByRegionEnabled() {
+ return this.delegateScan.isScanMetricsByRegionEnabled();
+ }
+
@Override
public Boolean isAsyncPrefetch() {
return this.delegateScan.isAsyncPrefetch();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 81186ce656e..c37ee1e35a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -130,6 +130,8 @@ public class Scan extends Query {
// define this attribute with the appropriate table name by calling
// scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME,
Bytes.toBytes(tableName))
static public final String SCAN_ATTRIBUTES_TABLE_NAME =
"scan.attributes.table.name";
+ static private final String SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE =
+ "scan.attributes.metrics.byregion.enable";
/**
* -1 means no caching specified and the value of {@link
HConstants#HBASE_CLIENT_SCANNER_CACHING}
@@ -1102,11 +1104,15 @@ public class Scan extends Query {
}
/**
- * Enable collection of {@link ScanMetrics}. For advanced users.
+ * Enable collection of {@link ScanMetrics}. For advanced users. While
disabling scan metrics,
+ * will also disable region level scan metrics.
* @param enabled Set to true to enable accumulating scan metrics
*/
public Scan setScanMetricsEnabled(final boolean enabled) {
setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
Bytes.toBytes(Boolean.valueOf(enabled)));
+ if (!enabled) {
+ setEnableScanMetricsByRegion(false);
+ }
return this;
}
@@ -1239,4 +1245,22 @@ public class Scan extends Query {
public static Scan createScanFromCursor(Cursor cursor) {
return new Scan().withStartRow(cursor.getRow());
}
+
+ /**
+ * Enables region level scan metrics. If scan metrics are disabled then
first enables scan metrics
+ * followed by region level scan metrics.
+ * @param enable Set to true to enable region level scan metrics.
+ */
+ public Scan setEnableScanMetricsByRegion(final boolean enable) {
+ if (enable) {
+ setScanMetricsEnabled(true);
+ }
+ setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE,
Bytes.toBytes(enable));
+ return this;
+ }
+
+ public boolean isScanMetricsByRegionEnabled() {
+ byte[] attr = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE);
+ return attr != null && Bytes.toBoolean(attr);
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 883a1e06185..8ba8df0c6a8 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -228,7 +228,7 @@ public class ScannerCallable extends
ClientServiceCallable<Result[]> {
// when what we need is to open scanner against new location.
// Attach NSRE to signal client that it needs to re-setup scanner.
if (this.scanMetrics != null) {
- this.scanMetrics.countOfNSRE.incrementAndGet();
+
this.scanMetrics.addToCounter(ScanMetrics.NOT_SERVING_REGION_EXCEPTION_METRIC_NAME,
1);
}
throw new DoNotRetryIOException("Resetting the scanner -- see
exception cause", ioe);
} else if (ioe instanceof RegionServerStoppedException) {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 5261ff4af5c..242b9031a4e 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
@@ -498,4 +499,8 @@ class ScannerCallableWithReplicas implements
RetryingCallable<Result[]> {
public long sleep(long pause, int tries) {
return currentScannerCallable.sleep(pause, tries);
}
+
+ public HRegionLocation getLocation() {
+ return currentScannerCallable.getLocation();
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
new file mode 100644
index 00000000000..adece500167
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Captures region level scan metrics as a map of metric name ({@link String})
-> Value
+ * ({@link AtomicLong}). <br/>
+ * <br/>
+ * One instance stores scan metrics for a single region only.
+ */
[email protected]
+public class RegionScanMetricsData {
+ private final Map<String, AtomicLong> counters = new HashMap<>();
+ private ScanMetricsRegionInfo scanMetricsRegionInfo =
+ ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO;
+
+ AtomicLong createCounter(String counterName) {
+ return ScanMetricsUtil.createCounter(counters, counterName);
+ }
+
+ void setCounter(String counterName, long value) {
+ ScanMetricsUtil.setCounter(counters, counterName, value);
+ }
+
+ void addToCounter(String counterName, long delta) {
+ ScanMetricsUtil.addToCounter(counters, counterName, delta);
+ }
+
+ Map<String, Long> collectMetrics(boolean reset) {
+ return ScanMetricsUtil.collectMetrics(counters, reset);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[" + scanMetricsRegionInfo + "," +
"Counters=" + counters
+ + "]";
+ }
+
+ /**
+ * Populate encoded region name and server name details if not already
populated. If details are
+ * already populated and a re-attempt is done then {@link
UnsupportedOperationException} is
+ * thrown.
+ */
+ void initScanMetricsRegionInfo(String encodedRegionName, ServerName
serverName) {
+ // Check by reference
+ if (scanMetricsRegionInfo ==
ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO) {
+ scanMetricsRegionInfo = new ScanMetricsRegionInfo(encodedRegionName,
serverName);
+ } else {
+ throw new UnsupportedOperationException(
+ "ScanMetricsRegionInfo has already been initialized to " +
scanMetricsRegionInfo
+ + " and cannot be re-initialized to region: " + encodedRegionName +
" and server: "
+ + serverName);
+ }
+ }
+
+ ScanMetricsRegionInfo getScanMetricsRegionInfo() {
+ return scanMetricsRegionInfo;
+ }
+}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
index e2038a17b37..8617a851509 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
@@ -96,8 +96,22 @@ public class ScanMetrics extends ServerSideScanMetrics {
public final AtomicLong countOfRemoteRPCRetries =
createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
/**
- * constructor
+ * Constructor
*/
public ScanMetrics() {
}
+
+ @Override
+ public void moveToNextRegion() {
+ super.moveToNextRegion();
+ currentRegionScanMetricsData.createCounter(RPC_CALLS_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(REMOTE_RPC_CALLS_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(BYTES_IN_RESULTS_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(REGIONS_SCANNED_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(RPC_RETRIES_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
new file mode 100644
index 00000000000..58b5b0a80db
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * POJO for capturing region level details when region level scan metrics are
enabled. <br>
+ * <br>
+ * Currently, encoded region name and server name (host name, ports and
startcode) are captured as
+ * region details. <br>
+ * <br>
+ * Instance of this class serves as key in the Map returned by
+ * {@link ServerSideScanMetrics#collectMetricsByRegion()} or
+ * {@link ServerSideScanMetrics#collectMetricsByRegion(boolean)}.
+ */
[email protected]
[email protected]
+public class ScanMetricsRegionInfo {
+ /**
+ * Users should only compare against this constant by reference and should
not make any
+ * assumptions regarding content of the constant.
+ */
+ public static final ScanMetricsRegionInfo EMPTY_SCAN_METRICS_REGION_INFO =
+ new ScanMetricsRegionInfo(null, null);
+
+ private final String encodedRegionName;
+ private final ServerName serverName;
+
+ ScanMetricsRegionInfo(String encodedRegionName, ServerName serverName) {
+ this.encodedRegionName = encodedRegionName;
+ this.serverName = serverName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ScanMetricsRegionInfo)) {
+ return false;
+ }
+ ScanMetricsRegionInfo other = (ScanMetricsRegionInfo) obj;
+ return new EqualsBuilder().append(encodedRegionName,
other.encodedRegionName)
+ .append(serverName, other.serverName).isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17,
37).append(encodedRegionName).append(serverName).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[encodedRegionName=" +
encodedRegionName + ",serverName="
+ + serverName + "]";
+ }
+
+ public String getEncodedRegionName() {
+ return encodedRegionName;
+ }
+
+ public ServerName getServerName() {
+ return serverName;
+ }
+}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
new file mode 100644
index 00000000000..2e1dfb8a3c4
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class ScanMetricsUtil {
+
+ private ScanMetricsUtil() {
+ }
+
+ /**
+ * Creates a new counter with the specified name and stores it in the
counters map.
+ * @return {@link AtomicLong} instance for the counter with counterName
+ */
+ static AtomicLong createCounter(Map<String, AtomicLong> counters, String
counterName) {
+ AtomicLong c = new AtomicLong(0);
+ counters.put(counterName, c);
+ return c;
+ }
+
+ /**
+ * Sets counter with counterName to passed in value, does nothing if counter
does not exist.
+ */
+ static void setCounter(Map<String, AtomicLong> counters, String counterName,
long value) {
+ AtomicLong c = counters.get(counterName);
+ if (c != null) {
+ c.set(value);
+ }
+ }
+
+ /**
+ * Increments the counter with counterName by delta, does nothing if counter
does not exist.
+ */
+ static void addToCounter(Map<String, AtomicLong> counters, String
counterName, long delta) {
+ AtomicLong c = counters.get(counterName);
+ if (c != null) {
+ c.addAndGet(delta);
+ }
+ }
+
+ /**
+ * Returns true if a counter exists with the counterName.
+ */
+ static boolean hasCounter(Map<String, AtomicLong> counters, String
counterName) {
+ return counters.containsKey(counterName);
+ }
+
+ /**
+ * Returns {@link AtomicLong} instance for this counter name, null if
counter does not exist.
+ */
+ static AtomicLong getCounter(Map<String, AtomicLong> counters, String
counterName) {
+ return counters.get(counterName);
+ }
+
+ /**
+ * Get all the values. If reset is true, we will reset the all AtomicLongs
back to 0.
+ * @param reset whether to reset the AtomicLongs to 0.
+ * @return A Map of String -> Long for metrics
+ */
+ static Map<String, Long> collectMetrics(Map<String, AtomicLong> counters,
boolean reset) {
+ Map<String, Long> metricsSnapshot = new HashMap<>();
+ for (Map.Entry<String, AtomicLong> e : counters.entrySet()) {
+ long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
+ metricsSnapshot.put(e.getKey(), value);
+ }
+ return metricsSnapshot;
+ }
+}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index 93fd8890d33..916451df75d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.client.metrics;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -31,18 +35,31 @@ import
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@SuppressWarnings("checkstyle:VisibilityModifier") // See HBASE-27757
public class ServerSideScanMetrics {
/**
- * Hash to hold the String -> Atomic Long mappings for each metric
+ * Hash to hold the String -> Atomic Long mappings for each metric
*/
private final Map<String, AtomicLong> counters = new HashMap<>();
+ private final List<RegionScanMetricsData> regionScanMetricsData = new
ArrayList<>(0);
+ protected RegionScanMetricsData currentRegionScanMetricsData = null;
/**
- * Create a new counter with the specified name
+ * If region level scan metrics are enabled, must call this method to start
collecting metrics for
+ * the region before scanning the region.
+ */
+ public void moveToNextRegion() {
+ currentRegionScanMetricsData = new RegionScanMetricsData();
+ regionScanMetricsData.add(currentRegionScanMetricsData);
+
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(FS_READ_TIME_METRIC_NAME);
+ }
+
+ /**
+ * Create a new counter with the specified name.
* @return {@link AtomicLong} instance for the counter with counterName
*/
protected AtomicLong createCounter(String counterName) {
- AtomicLong c = new AtomicLong(0);
- counters.put(counterName, c);
- return c;
+ return ScanMetricsUtil.createCounter(counters, counterName);
}
public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME =
"ROWS_SCANNED";
@@ -85,52 +102,106 @@ public class ServerSideScanMetrics {
public final AtomicLong fsReadTime = createCounter(FS_READ_TIME_METRIC_NAME);
+ /**
+ * Sets counter with counterName to passed in value, does nothing if counter
does not exist. If
+ * region level scan metrics are enabled then sets the value of counter for
the current region
+ * being scanned.
+ */
public void setCounter(String counterName, long value) {
- AtomicLong c = this.counters.get(counterName);
- if (c != null) {
- c.set(value);
+ ScanMetricsUtil.setCounter(counters, counterName, value);
+ if (this.currentRegionScanMetricsData != null) {
+ this.currentRegionScanMetricsData.setCounter(counterName, value);
}
}
- /** Returns true if a counter exists with the counterName */
+ /**
+ * Returns true if a counter exists with the counterName.
+ */
public boolean hasCounter(String counterName) {
- return this.counters.containsKey(counterName);
+ return ScanMetricsUtil.hasCounter(counters, counterName);
}
- /** Returns {@link AtomicLong} instance for this counter name, null if
counter does not exist. */
+ /**
+ * Returns {@link AtomicLong} instance for this counter name, null if
counter does not exist.
+ */
public AtomicLong getCounter(String counterName) {
- return this.counters.get(counterName);
+ return ScanMetricsUtil.getCounter(counters, counterName);
}
+ /**
+ * Increments the counter with counterName by delta, does nothing if counter
does not exist. If
+ * region level scan metrics are enabled then increments the counter
corresponding to the current
+ * region being scanned. Please see {@link #moveToNextRegion()}.
+ */
public void addToCounter(String counterName, long delta) {
- AtomicLong c = this.counters.get(counterName);
- if (c != null) {
- c.addAndGet(delta);
+ ScanMetricsUtil.addToCounter(counters, counterName, delta);
+ if (this.currentRegionScanMetricsData != null) {
+ this.currentRegionScanMetricsData.addToCounter(counterName, delta);
}
}
/**
- * Get all of the values since the last time this function was called.
Calling this function will
- * reset all AtomicLongs in the instance back to 0.
- * @return A Map of String -> Long for metrics
+ * Get all the values combined for all the regions since the last time this
function was called.
+ * Calling this function will reset all AtomicLongs in the instance back to
0.
+ * @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
return getMetricsMap(true);
}
/**
- * Get all of the values. If reset is true, we will reset the all
AtomicLongs back to 0.
+ * Get all the values combined for all the regions. If reset is true, we
will reset all the
+ * AtomicLongs back to 0.
* @param reset whether to reset the AtomicLongs to 0.
- * @return A Map of String -> Long for metrics
+ * @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap(boolean reset) {
+ return ImmutableMap.copyOf(ScanMetricsUtil.collectMetrics(counters,
reset));
+ }
+
+ /**
+ * Get values grouped by each region scanned since the last time this was
called. Calling this
+ * function will reset all region level scan metrics counters back to 0.
+ * @return A Map of region -> (Map of metric name -> Long) for metrics
+ */
+ public Map<ScanMetricsRegionInfo, Map<String, Long>>
collectMetricsByRegion() {
+ return collectMetricsByRegion(true);
+ }
+
+ /**
+ * Get values grouped by each region scanned. If reset is true, will reset
all the region level
+ * scan metrics counters back to 0.
+ * @param reset whether to reset region level scan metric counters to 0.
+ * @return A Map of region -> (Map of metric name -> Long) for metrics
+ */
+ public Map<ScanMetricsRegionInfo, Map<String, Long>>
collectMetricsByRegion(boolean reset) {
// Create a builder
- ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
- for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
- long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
- builder.put(e.getKey(), value);
+ ImmutableMap.Builder<ScanMetricsRegionInfo, Map<String, Long>> builder =
ImmutableMap.builder();
+ for (RegionScanMetricsData regionScanMetricsData :
this.regionScanMetricsData) {
+ if (
+ regionScanMetricsData.getScanMetricsRegionInfo()
+ == ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO
+ ) {
+ continue;
+ }
+ builder.put(regionScanMetricsData.getScanMetricsRegionInfo(),
+ regionScanMetricsData.collectMetrics(reset));
}
- // Build the immutable map so that people can't mess around with it.
return builder.build();
}
+
+ @Override
+ public String toString() {
+ return counters + "," +
regionScanMetricsData.stream().map(RegionScanMetricsData::toString)
+ .collect(Collectors.joining(","));
+ }
+
+ /**
+ * Call this method after calling {@link #moveToNextRegion()} to populate
server name and encoded
+ * region name details for the region being scanned and for which metrics
are being collected at
+ * the moment.
+ */
+ public void initScanMetricsRegionInfo(String encodedRegionName, ServerName
serverName) {
+ currentRegionScanMetricsData.initScanMetricsRegionInfo(encodedRegionName,
serverName);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 144c01de874..df99fd40338 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.mob.MobFileCache;
@@ -90,6 +91,12 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
initScanMetrics(scan);
} else {
this.scanMetrics = scanMetrics;
+ setIsScanMetricsByRegionEnabled(scan.isScanMetricsByRegionEnabled());
+ }
+ if (isScanMetricsByRegionEnabled()) {
+ this.scanMetrics.moveToNextRegion();
+
this.scanMetrics.initScanMetricsRegionInfo(region.getRegionInfo().getEncodedName(),
null);
+ // The server name will be null in scan metrics as this is a client side
region scanner
}
region.startRegionOperation();
}
@@ -110,8 +117,8 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
for (Cell cell : values) {
resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
}
- this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
- this.scanMetrics.countOfRowsScanned.incrementAndGet();
+ this.scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME,
resultSize);
+
this.scanMetrics.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME,
1);
}
return result;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
index 872c9a163c6..9e01e34877c 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -187,7 +188,7 @@ public class TableSnapshotScanner extends
AbstractClientScanner {
currentRegionScanner =
new ClientSideRegionScanner(conf, fs, restoreDir, htd, hri, scan,
scanMetrics);
if (this.scanMetrics != null) {
- this.scanMetrics.countOfRegions.incrementAndGet();
+
this.scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 26006676596..00ba1233424 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
@@ -3567,10 +3568,12 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
if (trackMetrics) {
// rather than increment yet another counter in StoreScanner, just
set the value here
// from block size progress before writing into the response
- scannerContext.getMetrics().countOfBlockBytesScanned
- .set(scannerContext.getBlockSizeProgress());
+ scannerContext.getMetrics().setCounter(
+ ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
+ scannerContext.getBlockSizeProgress());
if (rpcCall != null) {
-
scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
+
scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
+ rpcCall.getFsReadTime());
}
Map<String, Long> metrics =
scannerContext.getMetrics().getMetricsMap();
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index def22fc32a0..67cc9b7eba4 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
@@ -643,7 +644,8 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
return;
}
- scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
+ scannerContext.getMetrics()
+
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
}
private void incrementCountOfRowsScannedMetric(ScannerContext
scannerContext) {
@@ -651,7 +653,8 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
return;
}
- scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
+ scannerContext.getMetrics()
+
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
}
/** Returns true when the joined heap may have data for the current row */
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
index aebdb5f3a13..f9209c246d1 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -17,19 +17,28 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -93,12 +102,12 @@ public class TestAsyncTableScanMetrics {
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
- // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
- // scan are forced to hit all the regions.
+ // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*"
and "zzz*" so that
+ // scan hits all the region and not all rows lie in a single region
try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
- table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ,
VALUE),
- new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE),
- new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE)));
+ table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ,
VALUE),
+ new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
}
CONN =
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
@@ -141,25 +150,101 @@ public class TestAsyncTableScanMetrics {
}
@Test
- public void testNoScanMetrics() throws Exception {
+ public void testScanMetricsDisabled() throws Exception {
Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
assertEquals(3, pair.getFirst().size());
+ // Assert no scan metrics
assertNull(pair.getSecond());
}
@Test
- public void testScanMetrics() throws Exception {
- Pair<List<Result>, ScanMetrics> pair = method.scan(new
Scan().setScanMetricsEnabled(true));
+ public void testScanMetricsWithScanMetricsByRegionDisabled() throws
Exception {
+ Scan scan = new Scan();
+ scan.setScanMetricsEnabled(true);
+ Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
List<Result> results = pair.getFirst();
assertEquals(3, results.size());
- long bytes = results.stream().flatMap(r ->
Arrays.asList(r.rawCells()).stream())
- .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
+ long bytes = getBytesOfResults(results);
+ ScanMetrics scanMetrics = pair.getSecond();
+ assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+ assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+ assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
+ // Assert scan metrics have not been collected by region
+ assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+ }
+
+ @Test
+ public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+ Scan scan = new Scan();
+ scan.withStartRow(Bytes.toBytes("zzz1"), true);
+ scan.withStopRow(Bytes.toBytes("zzz1"), true);
+ scan.setEnableScanMetricsByRegion(true);
+ Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+ List<Result> results = pair.getFirst();
+ assertEquals(1, results.size());
+ long bytes = getBytesOfResults(results);
+ ScanMetrics scanMetrics = pair.getSecond();
+ assertEquals(1, scanMetrics.countOfRegions.get());
+ assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+ assertEquals(1, scanMetrics.countOfRPCcalls.get());
+ // Assert scan metrics by region were collected for the region scanned
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo smri = entry.getKey();
+ Map<String, Long> metrics = entry.getValue();
+ assertNotNull(smri.getServerName());
+ assertNotNull(smri.getEncodedRegionName());
+ // Assert overall scan metrics and scan metrics by region should be
equal as only 1 region
+ // was scanned.
+ assertEquals(scanMetrics.getMetricsMap(false), metrics);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+ Scan scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+ List<Result> results = pair.getFirst();
+ assertEquals(3, results.size());
+ long bytes = getBytesOfResults(results);
ScanMetrics scanMetrics = pair.getSecond();
+ Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+ assertEquals(NUM_REGIONS, (long)
overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+ assertEquals(bytes, (long)
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+ assertEquals(NUM_REGIONS, (long)
overallMetrics.get(RPC_CALLS_METRIC_NAME));
assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
- // also assert a server side metric to ensure that we have published them
into the client side
- // metrics.
- assertEquals(3, scanMetrics.countOfRowsScanned.get());
+ // Assert scan metrics by region were collected for the region scanned
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+ int rowsScannedAcrossAllRegions = 0;
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo smri = entry.getKey();
+ Map<String, Long> perRegionMetrics = entry.getValue();
+ assertNotNull(smri.getServerName());
+ assertNotNull(smri.getEncodedRegionName());
+ assertEquals(1, (long)
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+ if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+ bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+ assertEquals(bytes, (long)
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ rowsScannedAcrossAllRegions++;
+ } else {
+ assertEquals(0, (long)
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ assertEquals(0, (long)
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ }
+ }
+ assertEquals(3, rowsScannedAcrossAllRegions);
+ }
+
+ static long getBytesOfResults(List<Result> results) {
+ return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
+ .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
new file mode 100644
index 00000000000..53eaccaec0e
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static
org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanMetricsWithScannerSuspending {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+
HBaseClassTestRule.forClass(TestAsyncTableScanMetricsWithScannerSuspending.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME =
+
TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName());
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static AsyncConnection CONN;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(1);
+ // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*"
and "zzz*" so that
+ // scan hits all the region and not all rows lie in a single region
+ try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+ table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ,
VALUE),
+ new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+ }
+ CONN =
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithScannerSuspending() throws Exception {
+ // Setup scan
+ Scan scan = new Scan();
+ scan.withStartRow(Bytes.toBytes("xxx1"), true);
+ scan.withStopRow(Bytes.toBytes("zzz1"), true);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(1);
+
+ // Prepare scanner
+ final AtomicInteger rowsReadCounter = new AtomicInteger(0);
+ AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME,
scan, 1) {
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ rowsReadCounter.addAndGet(results.length);
+ super.onNext(results, controller);
+ }
+ };
+
+ // Do the scan so that rows get loaded in the scanner (consumer)
+ CONN.getTable(TABLE_NAME).scan(scan, scanner);
+
+ List<Result> results = new ArrayList<>();
+ int expectedTotalRows = 3;
+ // Assert that only 1 row has been loaded so far as maxCacheSize is set to
1 byte
+ for (int i = 1; i <= expectedTotalRows; i++) {
+ UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return scanner.isSuspended();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "The given scanner has been suspended in time";
+ }
+ });
+ assertTrue(scanner.isSuspended());
+ assertEquals(i, rowsReadCounter.get());
+ results.add(scanner.next());
+ }
+ Assert.assertNull(scanner.next());
+
+ // Assert on overall scan metrics and scan metrics by region
+ ScanMetrics scanMetrics = scanner.getScanMetrics();
+ // Assert on overall scan metrics
+ long bytes = getBytesOfResults(results);
+ Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+ assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+ assertEquals(bytes, (long)
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ // 1 Extra RPC call per region where no row is returned but
moreResultsInRegion is set to false
+ assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
+ // Assert scan metrics by region were collected for the region scanned
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ assertEquals(3, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo smri = entry.getKey();
+ Map<String, Long> perRegionMetrics = entry.getValue();
+ assertNotNull(smri.getServerName());
+ assertNotNull(smri.getEncodedRegionName());
+ assertEquals(1, (long)
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+ assertEquals(1, (long)
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+ assertEquals(bytes, (long)
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME));
+ }
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
index e0e7187da91..78041f8b972 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -179,6 +184,86 @@ public class TestClientSideRegionScanner {
}
}
+ @Test
+ public void testScanMetricsDisabled() throws IOException {
+ Configuration copyConf = new Configuration(conf);
+ Scan scan = new Scan();
+ try (ClientSideRegionScanner clientSideRegionScanner =
+ new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan,
null)) {
+ clientSideRegionScanner.next();
+ assertNull(clientSideRegionScanner.getScanMetrics());
+ }
+ }
+
+ private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics
scanMetrics)
+ throws IOException {
+ Configuration copyConf = new Configuration(conf);
+ Scan scan = new Scan();
+ scan.setScanMetricsEnabled(true);
+ TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+ try (ClientSideRegionScanner clientSideRegionScanner =
+ new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan,
scanMetrics)) {
+ clientSideRegionScanner.next();
+ ScanMetrics scanMetricsFromScanner =
clientSideRegionScanner.getScanMetrics();
+ assertNotNull(scanMetricsFromScanner);
+ if (scanMetrics != null) {
+ Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+ }
+ Map<String, Long> metricsMap =
scanMetricsFromScanner.getMetricsMap(false);
+ Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)
> 0);
+
Assert.assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty());
+ }
+ }
+
+ @Test
+ public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled()
throws IOException {
+ testScanMetricsWithScanMetricsByRegionDisabled(null);
+ }
+
+ @Test
+ public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws
IOException {
+ testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics());
+ }
+
+ private void testScanMetricByRegion(ScanMetrics scanMetrics) throws
IOException {
+ Configuration copyConf = new Configuration(conf);
+ Scan scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+ try (ClientSideRegionScanner clientSideRegionScanner =
+ new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan,
scanMetrics)) {
+ clientSideRegionScanner.next();
+ ScanMetrics scanMetricsFromScanner =
clientSideRegionScanner.getScanMetrics();
+ assertNotNull(scanMetricsFromScanner);
+ if (scanMetrics != null) {
+ Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+ }
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetricsFromScanner.collectMetricsByRegion();
+ Assert.assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertEquals(hri.getEncodedName(),
scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNull(scanMetricsRegionInfo.getServerName());
+
Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
+ Assert.assertEquals((long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME),
+ scanMetricsFromScanner.countOfRowsScanned.get());
+ }
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws
IOException {
+ testScanMetricByRegion(null);
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithScanMetricsAsInput() throws
IOException {
+ testScanMetricByRegion(new ScanMetrics());
+ }
+
private static Put createPut(int rowAsInt) {
byte[] row = Bytes.toBytes(rowAsInt);
Put put = new Put(row);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 4c30c2edff0..5d372ac7b70 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -26,6 +28,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -42,8 +45,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -58,6 +63,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -94,6 +100,7 @@ public class TestReplicasClient {
private static final byte[] f = HConstants.CATALOG_FAMILY;
private final static int REFRESH_PERIOD = 1000;
+ private static ServerName rsServerName;
/**
* This copro is used to synchronize the tests.
@@ -232,6 +239,9 @@ public class TestReplicasClient {
Configuration c = new Configuration(HTU.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
LOG.info("Master has stopped");
+
+ rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName();
+ Assert.assertNotNull(rsServerName);
}
@AfterClass
@@ -777,4 +787,74 @@ public class TestReplicasClient {
}
}
}
+
+ @Test
+ public void testScanMetricsByRegion() throws Exception {
+ byte[] b1 = Bytes.toBytes("testScanMetricsByRegion");
+ openRegion(hriSecondary);
+
+ try {
+ Put p = new Put(b1);
+ p.addColumn(f, b1, b1);
+ table.put(p);
+ LOG.info("Put done");
+ flushRegion(hriPrimary);
+
+ // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to
secondary replica
+ Thread.sleep(2 * REFRESH_PERIOD);
+
+ // Explicitly read replica 0
+ Scan scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ scan.withStartRow(b1, true);
+ scan.withStopRow(b1, true);
+ // Assert row was read from primary replica along with asserting scan
metrics by region
+ assertScanMetrics(scan, hriPrimary, false);
+ LOG.info("Scanned primary replica");
+
+ // Read from region replica
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
+ scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ scan.withStartRow(b1, true);
+ scan.withStopRow(b1, true);
+ scan.setConsistency(Consistency.TIMELINE);
+ // Assert row was read from secondary replica along with asserting no
scan metrics by region
+ // for timeline consistency
+ assertScanMetrics(scan, null, true);
+ LOG.info("Scanned secondary replica ");
+ } finally {
+ SlowMeCopro.getPrimaryCdl().get().countDown();
+ Delete d = new Delete(b1);
+ table.delete(d);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean
isStale)
+ throws IOException {
+ try (ResultScanner rs = table.getScanner(scan);) {
+ for (Result r : rs) {
+ Assert.assertEquals(isStale, r.isStale());
+ Assert.assertFalse(r.isEmpty());
+ }
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ rs.getScanMetrics().collectMetricsByRegion(false);
+ if (isStale) {
+ Assert.assertTrue(scanMetricsByRegion.isEmpty());
+ } else {
+ Assert.assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metrics = entry.getValue();
+ Assert.assertEquals(rsServerName,
scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(regionInfo.getEncodedName(),
+ scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertEquals(1, (long)
metrics.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(1, (long)
metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+ }
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
new file mode 100644
index 00000000000..36935511734
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestScanAttributes {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestScanAttributes.class);
+
+ @Test
+ public void testCoEnableAndCoDisableScanMetricsAndScanMetricsByRegion() {
+ Scan scan = new Scan();
+ Assert.assertFalse(scan.isScanMetricsEnabled());
+ Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+
+ // Assert enabling scan metrics by region enables scan metrics also
+ scan.setEnableScanMetricsByRegion(true);
+ Assert.assertTrue(scan.isScanMetricsEnabled());
+ Assert.assertTrue(scan.isScanMetricsByRegionEnabled());
+
+ // Assert disabling scan metrics disables scan metrics by region
+ scan.setScanMetricsEnabled(false);
+ Assert.assertFalse(scan.isScanMetricsEnabled());
+ Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
new file mode 100644
index 00000000000..6f84ddcc314
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.NOT_SERVING_REGION_EXCEPTION_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestTableScanMetrics extends FromClientSideBase {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestTableScanMetrics.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+
+ private static final TableName TABLE_NAME =
+ TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static final Random RAND = new Random(11);
+
+ private static int NUM_REGIONS;
+
+ private static Connection CONN;
+
+ @Parameters(name = "{index}: scanner={0}")
+ public static List<Object[]> params() {
+ return Arrays.asList(new Object[] { "ForwardScanner", new Scan() },
+ new Object[] { "ReverseScanner", new Scan().setReversed(true) });
+ }
+
+ @Parameter(0)
+ public String scannerName;
+
+ @Parameter(1)
+ public Scan originalScan;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // Start the minicluster
+ TEST_UTIL.startMiniCluster(2);
+ // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*"
and "zzz*" so that
+ // scan hits all the region and not all rows lie in a single region
+ try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+ table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ,
VALUE),
+ new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+ }
+ CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+ NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws
IOException {
+ Scan scan = new Scan(originalScan);
+ if (originalScan.isReversed()) {
+ scan.withStartRow(largerRow, true);
+ scan.withStopRow(smallerRow, true);
+ } else {
+ scan.withStartRow(smallerRow, true);
+ scan.withStopRow(largerRow, true);
+ }
+ return scan;
+ }
+
+ private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int
expectedCount)
+ throws IOException {
+ int countOfRows = 0;
+ ScanMetrics scanMetrics;
+ try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner =
table.getScanner(scan)) {
+ for (Result result : scanner) {
+ Assert.assertFalse(result.isEmpty());
+ countOfRows++;
+ }
+ scanMetrics = scanner.getScanMetrics();
+ }
+ Assert.assertEquals(expectedCount, countOfRows);
+ return scanMetrics;
+ }
+
+ @Test
+ public void testScanMetricsDisabled() throws Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
+ Assert.assertNull(scanMetrics);
+ }
+
+ @Test
+ public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception
{
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ scan.setScanMetricsEnabled(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+ // The test setup is such that we have 1 row per region in the scan range
+ Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+ }
+
+ @Test
+ public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws
Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ scan.setScanMetricsEnabled(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ // By default counters are collected with reset as true
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+ Assert.assertEquals(expectedRowsScanned, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ // Subsequent call to get scan metrics map should show all counters as 0
+ Assert.assertEquals(0, scanMetrics.countOfRegions.get());
+ Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get());
+ }
+
+ @Test
+ public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
+ scan.setEnableScanMetricsByRegion(true);
+ int expectedRowsScanned = 1;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+ Assert.assertEquals(expectedRowsScanned, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ // As we are scanning single row so, overall scan metrics will match per
region scan metrics
+ Assert.assertEquals(expectedRowsScanned, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+ Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ scan.setEnableScanMetricsByRegion(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+ Assert.assertEquals(expectedRowsScanned,
scanMetrics.countOfRowsScanned.get());
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+ int rowsScannedAcrossAllRegions = 0;
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+ rowsScannedAcrossAllRegions++;
+ } else {
+ assertEquals(0, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+ Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
+ }
+
+ @Test
+ public void testScanMetricsByRegionReset() throws Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ scan.setEnableScanMetricsByRegion(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+
+ // Retrieve scan metrics by region as a map and reset
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ // We scan 1 row per region
+ Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(1, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+
+ // Scan metrics have already been reset and now all counters should be 0
+ scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
+ // Size of map should be same as earlier
+ Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ // Counters should have been reset to 0
+ Assert.assertEquals(0, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(0, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+
+ @Test
+ public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws
Exception {
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(2);
+ TableName tableName =
TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
+ + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ Map<ScanMetricsRegionInfo, Map<String, Long>>
concurrentScanMetricsByRegion = new HashMap<>();
+
+ // Trigger two concurrent threads one of which scans the table and other
periodically
+ // collects the scan metrics (along with resetting the counters to 0).
+ Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setCaching(2);
+ try (ResultScanner rs = table.getScanner(scan)) {
+ ScanMetrics scanMetrics = rs.getScanMetrics();
+ AtomicInteger rowsScanned = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(1);
+ Runnable tableScanner = new Runnable() {
+ public void run() {
+ for (Result r : rs) {
+ Assert.assertFalse(r.isEmpty());
+ rowsScanned.incrementAndGet();
+ }
+ latch.countDown();
+ }
+ };
+ Runnable metricsCollector =
+ getPeriodicScanMetricsCollector(scanMetrics,
concurrentScanMetricsByRegion, latch);
+ executor.execute(tableScanner);
+ executor.execute(metricsCollector);
+ latch.await();
+ // Merge leftover scan metrics
+ mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
+ concurrentScanMetricsByRegion);
+ Assert.assertEquals(HBaseTestingUtility.ROWS.length,
rowsScanned.get());
+ }
+
+ Map<ScanMetricsRegionInfo, Map<String, Long>>
expectedScanMetricsByRegion;
+
+ // Collect scan metrics by region from single thread. Assert that
concurrent scan
+ // and metrics collection works as expected.
+ scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setCaching(2);
+ try (ResultScanner rs = table.getScanner(scan)) {
+ ScanMetrics scanMetrics = rs.getScanMetrics();
+ int rowsScanned = 0;
+ for (Result r : rs) {
+ Assert.assertFalse(r.isEmpty());
+ rowsScanned++;
+ }
+ Assert.assertEquals(HBaseTestingUtility.ROWS.length, rowsScanned);
+ expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
expectedScanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ // Remove millis between nexts metric as it is not deterministic
+ metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ // Each region will have 26 * 26 + 26 + 1 rows except last region
which will have 1 row
+ long rowsScannedFromMetrics =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ Assert.assertTrue(
+ rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26
+ 26 + 1));
+ }
+ }
+
+ // Assert on scan metrics by region
+ Assert.assertEquals(expectedScanMetricsByRegion,
concurrentScanMetricsByRegion);
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithRegionMove() throws Exception {
+ TableName tableName = TableName.valueOf(
+ TestTableScanMetrics.class.getSimpleName() +
"testScanMetricsByRegionWithRegionMove");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ // Scan 2 regions with start row keys: bbb and ccc
+ byte[] bbb = Bytes.toBytes("bbb");
+ byte[] ccc = Bytes.toBytes("ccc");
+ byte[] ddc = Bytes.toBytes("ddc");
+ long expectedCountOfRowsScannedInMovedRegion = 0;
+ // ROWS is the data loaded by loadTable()
+ for (byte[] row : HBaseTestingUtility.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
+ expectedCountOfRowsScannedInMovedRegion++;
+ }
+ }
+ byte[] movedRegion = null;
+ ScanMetrics scanMetrics;
+
+ // Initialize scan with maxResultSize as size of 50 rows.
+ Scan scan = generateScan(bbb, ddc);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(8000);
+
+ try (ResultScanner rs = table.getScanner(scan)) {
+ boolean isFirstScanOfRegion = true;
+ for (Result r : rs) {
+ byte[] row = r.getRow();
+ if (isFirstScanOfRegion) {
+ movedRegion = moveRegion(tableName, row);
+ isFirstScanOfRegion = false;
+ }
+ }
+ Assert.assertNotNull(movedRegion);
+
+ scanMetrics = rs.getScanMetrics();
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ long actualCountOfRowsScannedInMovedRegion = 0;
+ Set<ServerName> serversForMovedRegion = new HashSet<>();
+
+ // 2 regions scanned with two entries for first region as it moved in
b/w scan
+ Assert.assertEquals(3, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ if
(scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion)))
{
+ long rowsScanned =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ actualCountOfRowsScannedInMovedRegion += rowsScanned;
+ serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
+
+ Assert.assertTrue(metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1
+ || metricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME) ==
1);
+ }
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion,
+ actualCountOfRowsScannedInMovedRegion);
+ Assert.assertEquals(2, serversForMovedRegion.size());
+ }
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithRegionSplit() throws Exception {
+ TableName tableName = TableName.valueOf(
+ TestTableScanMetrics.class.getSimpleName() +
"testScanMetricsByRegionWithRegionSplit");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ // Scan 1 region with start row key: bbb
+ byte[] bbb = Bytes.toBytes("bbb");
+ byte[] bmw = Bytes.toBytes("bmw");
+ byte[] ccb = Bytes.toBytes("ccb");
+ long expectedCountOfRowsScannedInRegion = 0;
+ // ROWS is the data loaded by loadTable()
+ for (byte[] row : HBaseTestingUtility.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
+ expectedCountOfRowsScannedInRegion++;
+ }
+ }
+ ScanMetrics scanMetrics;
+ Set<String> expectedSplitRegionRes = new HashSet<>();
+
+ // Initialize scan
+ Scan scan = generateScan(bbb, ccb);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(8000);
+
+ try (ResultScanner rs = table.getScanner(scan)) {
+ boolean isFirstScanOfRegion = true;
+ for (Result r : rs) {
+ if (isFirstScanOfRegion) {
+ splitRegion(tableName, bbb, bmw)
+ .forEach(region ->
expectedSplitRegionRes.add(Bytes.toString(region)));
+ isFirstScanOfRegion = false;
+ }
+ }
+
+ scanMetrics = rs.getScanMetrics();
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+
+ long actualCountOfRowsScannedInRegion = 0;
+ long rpcRetiesCount = 0;
+ long notServingRegionExceptionCount = 0;
+ Set<String> splitRegionRes = new HashSet<>();
+
+ // 1 entry each for parent and two child regions
+ Assert.assertEquals(3, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ long rowsScanned =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ actualCountOfRowsScannedInRegion += rowsScanned;
+ splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+
+ if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
+ rpcRetiesCount++;
+ }
+ if (metricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME) == 1) {
+ notServingRegionExceptionCount++;
+ }
+
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ Assert.assertEquals(expectedCountOfRowsScannedInRegion,
actualCountOfRowsScannedInRegion);
+ Assert.assertEquals(1, rpcRetiesCount);
+ Assert.assertEquals(1, notServingRegionExceptionCount);
+ Assert.assertEquals(expectedSplitRegionRes, splitRegionRes);
+ }
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithRegionMerge() throws Exception {
+ TableName tableName = TableName.valueOf(
+ TestTableScanMetrics.class.getSimpleName() +
"testScanMetricsByRegionWithRegionMerge");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ // Scan 2 regions with start row keys: bbb and ccc
+ byte[] bbb = Bytes.toBytes("bbb");
+ byte[] ccc = Bytes.toBytes("ccc");
+ byte[] ddc = Bytes.toBytes("ddc");
+ long expectedCountOfRowsScannedInRegions = 0;
+ // ROWS is the data loaded by loadTable()
+ for (byte[] row : HBaseTestingUtility.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
+ expectedCountOfRowsScannedInRegions++;
+ }
+ }
+ ScanMetrics scanMetrics;
+ Set<String> expectedMergeRegionsRes = new HashSet<>();
+ String mergedRegionEncodedName = null;
+
+ // Initialize scan
+ Scan scan = generateScan(bbb, ddc);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(8000);
+
+ try (ResultScanner rs = table.getScanner(scan)) {
+ boolean isFirstScanOfRegion = true;
+ for (Result r : rs) {
+ if (isFirstScanOfRegion) {
+ List<byte[]> out = mergeRegions(tableName, bbb, ccc);
+ // Entry with index 2 is the encoded region name of merged region
+ mergedRegionEncodedName = Bytes.toString(out.get(2));
+ out.forEach(region ->
expectedMergeRegionsRes.add(Bytes.toString(region)));
+ isFirstScanOfRegion = false;
+ }
+ }
+
+ scanMetrics = rs.getScanMetrics();
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ long actualCountOfRowsScannedInRegions = 0;
+ Set<String> mergeRegionsRes = new HashSet<>();
+ boolean containsMergedRegionInScanMetrics = false;
+
+ // 1 entry each for old region from which first row was scanned and
new merged region
+ Assert.assertEquals(2, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ long rowsScanned =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ actualCountOfRowsScannedInRegions += rowsScanned;
+ mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+ if
(scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
+ containsMergedRegionInScanMetrics = true;
+ Assert.assertEquals(1, (long)
metricsMap.get(RPC_RETRIES_METRIC_NAME));
+ } else {
+ Assert.assertEquals(1, (long)
metricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME));
+ }
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ Assert.assertEquals(expectedCountOfRowsScannedInRegions,
actualCountOfRowsScannedInRegions);
+
Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
+ Assert.assertTrue(containsMergedRegionInScanMetrics);
+ }
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
+ Map<ScanMetricsRegionInfo, Map<String, Long>>
scanMetricsByRegionCollection,
+ CountDownLatch latch) {
+ return new Runnable() {
+ public void run() {
+ try {
+ while (latch.getCount() > 0) {
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ mergeScanMetricsByRegion(scanMetricsByRegion,
scanMetricsByRegionCollection);
+ Thread.sleep(RAND.nextInt(10));
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String,
Long>> srcMap,
+ Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
srcMap.entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ // Remove millis between nexts metric as it is not deterministic
+ metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+ if (dstMap.containsKey(scanMetricsRegionInfo)) {
+ Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
+ for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
+ String metricName = metricEntry.getKey();
+ Long existingValue = dstMetricsMap.get(metricName);
+ Long newValue = metricEntry.getValue();
+ dstMetricsMap.put(metricName, existingValue + newValue);
+ }
+ } else {
+ dstMap.put(scanMetricsRegionInfo, metricsMap);
+ }
+ }
+ }
+
+ /**
+ * Moves the region with start row key from its original region server to
some other region
+ * server. This is a synchronous method.
+ * @param tableName Table name of region to be moved belongs.
+ * @param startRow Start row key of the region to be moved.
+ * @return Encoded region name of the region which was moved.
+ */
+ private byte[] moveRegion(TableName tableName, byte[] startRow) throws
IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+ HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
+ byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
+ ServerName initialServerName = loc.getServerName();
+
+ admin.move(encodedRegionName);
+
+ ServerName finalServerName = regionLocator.getRegionLocation(startRow,
true).getServerName();
+
+ // Assert that region actually moved
+ Assert.assertNotEquals(initialServerName, finalServerName);
+ return encodedRegionName;
+ }
+
+ /**
+ * Splits the region with start row key at the split key provided. This is a
synchronous method.
+ * @param tableName Table name of region to be split.
+ * @param startRow Start row key of the region to be split.
+ * @param splitKey Split key for splitting the region.
+ * @return List of encoded region names with first element being parent
region followed by two
+ * child regions.
+ */
+ private List<byte[]> splitRegion(TableName tableName, byte[] startRow,
byte[] splitKey)
+ throws IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+ HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
+ byte[] initialEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ ServerName initialTopServerName = topLoc.getServerName();
+ HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey,
true);
+ byte[] initialEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+ ServerName initialBottomServerName = bottomLoc.getServerName();
+
+ // Assert region is ready for split
+ Assert.assertEquals(initialTopServerName, initialBottomServerName);
+ Assert.assertTrue(Bytes.equals(initialEncodedTopRegionName,
initialEncodedBottomRegionName));
+
+ FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName,
splitKey));
+
+ topLoc = regionLocator.getRegionLocation(startRow, true);
+ byte[] finalEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ bottomLoc = regionLocator.getRegionLocation(splitKey, true);
+ byte[] finalEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+ // Assert that region split is complete
+ Assert.assertFalse(Bytes.equals(finalEncodedTopRegionName,
finalEncodedBottomRegionName));
+ Assert.assertFalse(Bytes.equals(initialEncodedTopRegionName,
finalEncodedBottomRegionName));
+ Assert.assertFalse(Bytes.equals(initialEncodedBottomRegionName,
finalEncodedTopRegionName));
+
+ return Arrays.asList(initialEncodedTopRegionName,
finalEncodedTopRegionName,
+ finalEncodedBottomRegionName);
+ }
+
+ /**
+ * Merges two regions with the start row key as topRegion and bottomRegion.
Ensures that the
+ * regions to be merged are adjacent regions. This is a synchronous method.
+ * @param tableName Table name of regions to be merged.
+ * @param topRegion Start row key of first region for merging.
+ * @param bottomRegion Start row key of second region for merging.
+ * @return List of encoded region names with first two elements being
original regions followed by
+ * the merged region.
+ */
+ private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion,
byte[] bottomRegion)
+ throws IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+ HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
+ byte[] initialEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ String initialTopRegionEndKey =
Bytes.toString(topLoc.getRegion().getEndKey());
+ HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion,
true);
+ byte[] initialEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+ String initialBottomRegionStartKey =
Bytes.toString(bottomLoc.getRegion().getStartKey());
+
+ // Assert that regions are ready to be merged
+ Assert.assertFalse(Bytes.equals(initialEncodedTopRegionName,
initialEncodedBottomRegionName));
+ Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
+
+ FutureUtils.get(admin.mergeRegionsAsync(
+ new byte[][] { initialEncodedTopRegionName,
initialEncodedBottomRegionName }, false));
+
+ topLoc = regionLocator.getRegionLocation(topRegion, true);
+ byte[] finalEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
+ byte[] finalEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+ // Assert regions have been merges successfully
+ Assert.assertTrue(Bytes.equals(finalEncodedTopRegionName,
finalEncodedBottomRegionName));
+ Assert.assertFalse(Bytes.equals(initialEncodedTopRegionName,
finalEncodedTopRegionName));
+ Assert.assertFalse(Bytes.equals(initialEncodedBottomRegionName,
finalEncodedTopRegionName));
+
+ return Arrays.asList(initialEncodedTopRegionName,
initialEncodedBottomRegionName,
+ finalEncodedTopRegionName);
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 173e6d0683c..06373cda402 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -33,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -243,6 +249,84 @@ public class TestTableSnapshotScanner {
testScanner(UTIL, "testWithMultiRegion", 20, true);
}
+ private ScanMetrics createTableSnapshotScannerAndGetScanMetrics(boolean
enableScanMetrics,
+ boolean enableScanMetricsByRegion, byte[] endKey) throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName() + "_TABLE");
+ String snapshotName = name.getMethodName() + "_SNAPSHOT";
+ try {
+ createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
+ Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+ Scan scan = new Scan().withStartRow(bbb).withStopRow(endKey);
+ scan.setScanMetricsEnabled(enableScanMetrics);
+ scan.setEnableScanMetricsByRegion(enableScanMetricsByRegion);
+ Configuration conf = UTIL.getConfiguration();
+
+ TableSnapshotScanner snapshotScanner =
+ new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+ verifyScanner(snapshotScanner, bbb, endKey);
+ return snapshotScanner.getScanMetrics();
+ } finally {
+ UTIL.getAdmin().deleteSnapshot(snapshotName);
+ UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsDisabled() throws Exception {
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(false, false, yyy);
+ Assert.assertNull(scanMetrics);
+ }
+
+ @Test
+ public void testScanMetricsWithScanMetricsByRegionDisabled() throws
Exception {
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(true, false, yyy);
+ Assert.assertNotNull(scanMetrics);
+ int rowsScanned = 0;
+ for (byte[] row : HBaseTestingUtility.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, yyy) < 0) {
+ rowsScanned++;
+ }
+ }
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+ Assert.assertEquals(rowsScanned, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+
+ @Test
+ public void testScanMetricsByRegionForSingleRegion() throws Exception {
+ // Scan single row with row key bbb
+ byte[] bbc = Bytes.toBytes("bbc");
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(true, true, bbc);
+ Assert.assertNotNull(scanMetrics);
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ Assert.assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(1, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionForMultiRegion() throws Exception {
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(true, true, yyy);
+ Assert.assertNotNull(scanMetrics);
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ }
+
@Test
public void testScannerWithRestoreScanner() throws Exception {
TableName tableName = TableName.valueOf("testScanner");