This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 7988cdb89af HBASE-29233: Capture scan metrics at region level (#6868)
7988cdb89af is described below
commit 7988cdb89afd66a5e7fea52ded71fc006aca0ee0
Author: sanjeet006py <[email protected]>
AuthorDate: Fri Jun 27 00:52:32 2025 +0530
HBASE-29233: Capture scan metrics at region level (#6868)
Signed-off-by: Viraj Jasani <[email protected]>
---
.gitignore | 1 +
.../hadoop/hbase/client/AbstractClientScanner.java | 12 +
.../hadoop/hbase/client/AsyncClientScanner.java | 15 +
.../hadoop/hbase/client/ConnectionUtils.java | 14 +-
.../apache/hadoop/hbase/client/ImmutableScan.java | 11 +
.../java/org/apache/hadoop/hbase/client/Scan.java | 26 +-
.../client/metrics/RegionScanMetricsData.java | 77 +++
.../hadoop/hbase/client/metrics/ScanMetrics.java | 16 +-
.../client/metrics/ScanMetricsRegionInfo.java | 81 +++
.../hbase/client/metrics/ScanMetricsUtil.java | 88 +++
.../client/metrics/ServerSideScanMetrics.java | 121 +++-
.../hbase/client/ClientSideRegionScanner.java | 11 +-
.../hadoop/hbase/client/TableSnapshotScanner.java | 3 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 9 +-
.../hbase/regionserver/RegionScannerImpl.java | 7 +-
.../hbase/client/TestAsyncTableScanMetrics.java | 111 +++-
...AsyncTableScanMetricsWithScannerSuspending.java | 162 +++++
.../hbase/client/TestClientSideRegionScanner.java | 85 +++
.../hadoop/hbase/client/TestReplicasClient.java | 75 +++
.../hadoop/hbase/client/TestScanAttributes.java | 50 ++
.../hadoop/hbase/client/TestTableScanMetrics.java | 697 +++++++++++++++++++++
.../hbase/client/TestTableSnapshotScanner.java | 84 +++
22 files changed, 1701 insertions(+), 55 deletions(-)
diff --git a/.gitignore b/.gitignore
index 52d169dd5ad..da26fb98456 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,3 +25,4 @@ linklint/
**/*.log
tmp
**/.flattened-pom.xml
+.vscode/
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 48cec12f43c..ece657deb0a 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public abstract class AbstractClientScanner implements ResultScanner {
protected ScanMetrics scanMetrics;
+ private boolean isScanMetricsByRegionEnabled = false;
/**
* Check and initialize if application wants to collect scan metrics
@@ -34,6 +35,9 @@ public abstract class AbstractClientScanner implements
ResultScanner {
// check if application wants to collect scan metrics
if (scan.isScanMetricsEnabled()) {
scanMetrics = new ScanMetrics();
+ if (scan.isScanMetricsByRegionEnabled()) {
+ isScanMetricsByRegionEnabled = true;
+ }
}
}
@@ -46,4 +50,12 @@ public abstract class AbstractClientScanner implements
ResultScanner {
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
+
+ protected void setIsScanMetricsByRegionEnabled(boolean
isScanMetricsByRegionEnabled) {
+ this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
+ }
+
+ protected boolean isScanMetricsByRegionEnabled() {
+ return isScanMetricsByRegionEnabled;
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index b61f5b80c9e..d58c8e60c8d 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -95,6 +95,8 @@ class AsyncClientScanner {
private final Map<String, byte[]> requestAttributes;
+ private final boolean isScanMetricsByRegionEnabled;
+
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer,
TableName tableName,
AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long
pauseNsForServerOverloaded,
int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int
startLogErrorsCnt,
@@ -118,12 +120,17 @@ class AsyncClientScanner {
this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = createScanResultCache(scan);
this.requestAttributes = requestAttributes;
+ boolean isScanMetricsByRegionEnabled = false;
if (scan.isScanMetricsEnabled()) {
this.scanMetrics = new ScanMetrics();
consumer.onScanMetricsCreated(scanMetrics);
+ if (this.scan.isScanMetricsByRegionEnabled()) {
+ isScanMetricsByRegionEnabled = true;
+ }
} else {
this.scanMetrics = null;
}
+ this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
/*
* Assumes that the `start()` method is called immediately after
construction. If this is no
@@ -250,6 +257,9 @@ class AsyncClientScanner {
}
private void openScanner() {
+ if (this.isScanMetricsByRegionEnabled) {
+ scanMetrics.moveToNextRegion();
+ }
incRegionCountMetrics(scanMetrics);
openScannerTries.set(1);
addListener(timelineConsistentRead(conn.getLocator(), tableName, scan,
scan.getStartRow(),
@@ -265,6 +275,11 @@ class AsyncClientScanner {
span.end();
}
}
+ if (this.isScanMetricsByRegionEnabled) {
+ HRegionLocation loc = resp.loc;
+
this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
+ loc.getServerName());
+ }
startScan(resp);
}
});
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index cdb85845f03..40e205ddca8 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -341,9 +341,9 @@ public final class ConnectionUtils {
if (scanMetrics == null) {
return;
}
- scanMetrics.countOfRPCcalls.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.RPC_CALLS_METRIC_NAME, 1);
if (isRegionServerRemote) {
- scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME, 1);
}
}
@@ -351,9 +351,9 @@ public final class ConnectionUtils {
if (scanMetrics == null) {
return;
}
- scanMetrics.countOfRPCRetries.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.RPC_RETRIES_METRIC_NAME, 1);
if (isRegionServerRemote) {
- scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_RETRIES_METRIC_NAME, 1);
}
}
@@ -368,9 +368,9 @@ public final class ConnectionUtils {
resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
}
}
- scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+ scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME,
resultSize);
if (isRegionServerRemote) {
- scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
+
scanMetrics.addToCounter(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME,
resultSize);
}
}
@@ -390,7 +390,7 @@ public final class ConnectionUtils {
if (scanMetrics == null) {
return;
}
- scanMetrics.countOfRegions.incrementAndGet();
+ scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
}
/**
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
index 128d46daac4..eafab0e7fd8 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
@@ -228,6 +228,12 @@ public final class ImmutableScan extends Scan {
"ImmutableScan does not allow access to setScanMetricsEnabled");
}
+ @Override
+ public Scan setEnableScanMetricsByRegion(final boolean enable) {
+ throw new UnsupportedOperationException(
+ "ImmutableScan does not allow access to setEnableScanMetricsByRegion");
+ }
+
@Override
@Deprecated
public Scan setAsyncPrefetch(boolean asyncPrefetch) {
@@ -402,6 +408,11 @@ public final class ImmutableScan extends Scan {
return this.delegateScan.isScanMetricsEnabled();
}
+ @Override
+ public boolean isScanMetricsByRegionEnabled() {
+ return this.delegateScan.isScanMetricsByRegionEnabled();
+ }
+
@Override
public Boolean isAsyncPrefetch() {
return this.delegateScan.isAsyncPrefetch();
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 62a65e4e6e1..5cc0d0fac87 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -115,6 +115,8 @@ public class Scan extends Query {
// define this attribute with the appropriate table name by calling
// scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME,
Bytes.toBytes(tableName))
static public final String SCAN_ATTRIBUTES_TABLE_NAME =
"scan.attributes.table.name";
+ static private final String SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE =
+ "scan.attributes.metrics.byregion.enable";
/**
* -1 means no caching specified and the value of {@link
HConstants#HBASE_CLIENT_SCANNER_CACHING}
@@ -905,11 +907,15 @@ public class Scan extends Query {
}
/**
- * Enable collection of {@link ScanMetrics}. For advanced users.
+ * Enable collection of {@link ScanMetrics}. For advanced users. While
disabling scan metrics,
+ * will also disable region level scan metrics.
* @param enabled Set to true to enable accumulating scan metrics
*/
public Scan setScanMetricsEnabled(final boolean enabled) {
setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE,
Bytes.toBytes(Boolean.valueOf(enabled)));
+ if (!enabled) {
+ setEnableScanMetricsByRegion(false);
+ }
return this;
}
@@ -1033,4 +1039,22 @@ public class Scan extends Query {
public static Scan createScanFromCursor(Cursor cursor) {
return new Scan().withStartRow(cursor.getRow());
}
+
+ /**
+ * Enables region level scan metrics. If scan metrics are disabled then
first enables scan metrics
+ * followed by region level scan metrics.
+ * @param enable Set to true to enable region level scan metrics.
+ */
+ public Scan setEnableScanMetricsByRegion(final boolean enable) {
+ if (enable) {
+ setScanMetricsEnabled(true);
+ }
+ setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE,
Bytes.toBytes(enable));
+ return this;
+ }
+
+ public boolean isScanMetricsByRegionEnabled() {
+ byte[] attr = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE);
+ return attr != null && Bytes.toBoolean(attr);
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
new file mode 100644
index 00000000000..ca6a111175e
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Captures region level scan metrics as a map of metric name ({@link String})
-> Value
+ * ({@link AtomicLong}). <br/>
+ * <br/>
+ * One instance stores scan metrics for a single region only.
+ */
[email protected]
+public class RegionScanMetricsData {
+ private final Map<String, AtomicLong> counters = new HashMap<>();
+ private ScanMetricsRegionInfo scanMetricsRegionInfo =
+ ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO;
+
+ AtomicLong createCounter(String counterName) {
+ return ScanMetricsUtil.createCounter(counters, counterName);
+ }
+
+ void setCounter(String counterName, long value) {
+ ScanMetricsUtil.setCounter(counters, counterName, value);
+ }
+
+ void addToCounter(String counterName, long delta) {
+ ScanMetricsUtil.addToCounter(counters, counterName, delta);
+ }
+
+ Map<String, Long> collectMetrics(boolean reset) {
+ return ScanMetricsUtil.collectMetrics(counters, reset);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[" + scanMetricsRegionInfo + "," +
"Counters=" + counters
+ + "]";
+ }
+
+ /**
+ * Populate encoded region name and server name details if not already
populated. If details are
+ * already populated and a re-attempt is done then {@link
UnsupportedOperationException} is
+ * thrown.
+ */
+ void initScanMetricsRegionInfo(String encodedRegionName, ServerName
serverName) {
+ // Check by reference
+ if (scanMetricsRegionInfo ==
ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO) {
+ scanMetricsRegionInfo = new ScanMetricsRegionInfo(encodedRegionName,
serverName);
+ } else {
+ throw new UnsupportedOperationException("ScanMetricsRegionInfo has
already been initialized");
+ }
+ }
+
+ ScanMetricsRegionInfo getScanMetricsRegionInfo() {
+ return scanMetricsRegionInfo;
+ }
+}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
index e2038a17b37..8617a851509 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
@@ -96,8 +96,22 @@ public class ScanMetrics extends ServerSideScanMetrics {
public final AtomicLong countOfRemoteRPCRetries =
createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
/**
- * constructor
+ * Constructor
*/
public ScanMetrics() {
}
+
+ @Override
+ public void moveToNextRegion() {
+ super.moveToNextRegion();
+ currentRegionScanMetricsData.createCounter(RPC_CALLS_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(REMOTE_RPC_CALLS_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(BYTES_IN_RESULTS_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(REGIONS_SCANNED_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(RPC_RETRIES_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
+ }
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
new file mode 100644
index 00000000000..72fb0ab418d
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * POJO for capturing region level details when region level scan metrics are
enabled. <br>
+ * <br>
+ * Currently, encoded region name and server name (host name, ports and
startcode) are captured as
+ * region details. <br>
+ * <br>
+ * Instance of this class serves as key in the Map returned by
+ * {@link ServerSideScanMetrics#collectMetricsByRegion()} or
+ * {@link ServerSideScanMetrics#collectMetricsByRegion(boolean)}.
+ */
[email protected]
[email protected]
+public class ScanMetricsRegionInfo {
+ /**
+ * Users should only compare against this constant by reference and should
not make any
+ * assumptions regarding content of the constant.
+ */
+ public static final ScanMetricsRegionInfo EMPTY_SCAN_METRICS_REGION_INFO =
+ new ScanMetricsRegionInfo(null, null);
+
+ private final String encodedRegionName;
+ private final ServerName serverName;
+
+ ScanMetricsRegionInfo(String encodedRegionName, ServerName serverName) {
+ this.encodedRegionName = encodedRegionName;
+ this.serverName = serverName;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof ScanMetricsRegionInfo other)) {
+ return false;
+ }
+ return new EqualsBuilder().append(encodedRegionName,
other.encodedRegionName)
+ .append(serverName, other.serverName).isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17,
37).append(encodedRegionName).append(serverName).toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getSimpleName() + "[encodedRegionName=" +
encodedRegionName + ",serverName="
+ + serverName + "]";
+ }
+
+ public String getEncodedRegionName() {
+ return encodedRegionName;
+ }
+
+ public ServerName getServerName() {
+ return serverName;
+ }
+}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
new file mode 100644
index 00000000000..2e1dfb8a3c4
--- /dev/null
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class ScanMetricsUtil {
+
+ private ScanMetricsUtil() {
+ }
+
+ /**
+ * Creates a new counter with the specified name and stores it in the
counters map.
+ * @return {@link AtomicLong} instance for the counter with counterName
+ */
+ static AtomicLong createCounter(Map<String, AtomicLong> counters, String
counterName) {
+ AtomicLong c = new AtomicLong(0);
+ counters.put(counterName, c);
+ return c;
+ }
+
+ /**
+ * Sets counter with counterName to passed in value, does nothing if counter
does not exist.
+ */
+ static void setCounter(Map<String, AtomicLong> counters, String counterName,
long value) {
+ AtomicLong c = counters.get(counterName);
+ if (c != null) {
+ c.set(value);
+ }
+ }
+
+ /**
+ * Increments the counter with counterName by delta, does nothing if counter
does not exist.
+ */
+ static void addToCounter(Map<String, AtomicLong> counters, String
counterName, long delta) {
+ AtomicLong c = counters.get(counterName);
+ if (c != null) {
+ c.addAndGet(delta);
+ }
+ }
+
+ /**
+ * Returns true if a counter exists with the counterName.
+ */
+ static boolean hasCounter(Map<String, AtomicLong> counters, String
counterName) {
+ return counters.containsKey(counterName);
+ }
+
+ /**
+ * Returns {@link AtomicLong} instance for this counter name, null if
counter does not exist.
+ */
+ static AtomicLong getCounter(Map<String, AtomicLong> counters, String
counterName) {
+ return counters.get(counterName);
+ }
+
+ /**
+ * Get all the values. If reset is true, we will reset the all AtomicLongs
back to 0.
+ * @param reset whether to reset the AtomicLongs to 0.
+ * @return A Map of String -> Long for metrics
+ */
+ static Map<String, Long> collectMetrics(Map<String, AtomicLong> counters,
boolean reset) {
+ Map<String, Long> metricsSnapshot = new HashMap<>();
+ for (Map.Entry<String, AtomicLong> e : counters.entrySet()) {
+ long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
+ metricsSnapshot.put(e.getKey(), value);
+ }
+ return metricsSnapshot;
+ }
+}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index ff83584ccb4..ecc2d9bee14 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.client.metrics;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -31,18 +35,31 @@ import
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@SuppressWarnings("checkstyle:VisibilityModifier") // See HBASE-27757
public class ServerSideScanMetrics {
/**
- * Hash to hold the String -> Atomic Long mappings for each metric
+ * Hash to hold the String -> Atomic Long mappings for each metric
*/
private final Map<String, AtomicLong> counters = new HashMap<>();
+ private final List<RegionScanMetricsData> regionScanMetricsData = new
ArrayList<>(0);
+ protected RegionScanMetricsData currentRegionScanMetricsData = null;
/**
- * Create a new counter with the specified name
+ * If region level scan metrics are enabled, must call this method to start
collecting metrics for
+ * the region before scanning the region.
+ */
+ public void moveToNextRegion() {
+ currentRegionScanMetricsData = new RegionScanMetricsData();
+ regionScanMetricsData.add(currentRegionScanMetricsData);
+
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
+
currentRegionScanMetricsData.createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME);
+ currentRegionScanMetricsData.createCounter(FS_READ_TIME_METRIC_NAME);
+ }
+
+ /**
+ * Create a new counter with the specified name.
* @return {@link AtomicLong} instance for the counter with counterName
*/
protected AtomicLong createCounter(String counterName) {
- AtomicLong c = new AtomicLong(0);
- counters.put(counterName, c);
- return c;
+ return ScanMetricsUtil.createCounter(counters, counterName);
}
public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME =
"ROWS_SCANNED";
@@ -69,52 +86,106 @@ public class ServerSideScanMetrics {
public final AtomicLong fsReadTime = createCounter(FS_READ_TIME_METRIC_NAME);
+ /**
+ * Sets counter with counterName to passed in value, does nothing if counter
does not exist. If
+ * region level scan metrics are enabled then sets the value of counter for
the current region
+ * being scanned.
+ */
public void setCounter(String counterName, long value) {
- AtomicLong c = this.counters.get(counterName);
- if (c != null) {
- c.set(value);
+ ScanMetricsUtil.setCounter(counters, counterName, value);
+ if (this.currentRegionScanMetricsData != null) {
+ this.currentRegionScanMetricsData.setCounter(counterName, value);
}
}
- /** Returns true if a counter exists with the counterName */
+ /**
+ * Returns true if a counter exists with the counterName.
+ */
public boolean hasCounter(String counterName) {
- return this.counters.containsKey(counterName);
+ return ScanMetricsUtil.hasCounter(counters, counterName);
}
- /** Returns {@link AtomicLong} instance for this counter name, null if
counter does not exist. */
+ /**
+ * Returns {@link AtomicLong} instance for this counter name, null if
counter does not exist.
+ */
public AtomicLong getCounter(String counterName) {
- return this.counters.get(counterName);
+ return ScanMetricsUtil.getCounter(counters, counterName);
}
+ /**
+ * Increments the counter with counterName by delta, does nothing if counter
does not exist. If
+ * region level scan metrics are enabled then increments the counter
corresponding to the current
+ * region being scanned. Please see {@link #moveToNextRegion()}.
+ */
public void addToCounter(String counterName, long delta) {
- AtomicLong c = this.counters.get(counterName);
- if (c != null) {
- c.addAndGet(delta);
+ ScanMetricsUtil.addToCounter(counters, counterName, delta);
+ if (this.currentRegionScanMetricsData != null) {
+ this.currentRegionScanMetricsData.addToCounter(counterName, delta);
}
}
/**
- * Get all of the values since the last time this function was called.
Calling this function will
- * reset all AtomicLongs in the instance back to 0.
- * @return A Map of String -> Long for metrics
+ * Get all the values combined for all the regions since the last time this
function was called.
+ * Calling this function will reset all AtomicLongs in the instance back to
0.
+ * @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
return getMetricsMap(true);
}
/**
- * Get all of the values. If reset is true, we will reset the all
AtomicLongs back to 0.
+ * Get all the values combined for all the regions. If reset is true, we
will reset all the
+ * AtomicLongs back to 0.
* @param reset whether to reset the AtomicLongs to 0.
- * @return A Map of String -> Long for metrics
+ * @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap(boolean reset) {
+ return ImmutableMap.copyOf(ScanMetricsUtil.collectMetrics(counters,
reset));
+ }
+
+ /**
+ * Get values grouped by each region scanned since the last time this was
called. Calling this
+ * function will reset all region level scan metrics counters back to 0.
+ * @return A Map of region -> (Map of metric name -> Long) for metrics
+ */
+ public Map<ScanMetricsRegionInfo, Map<String, Long>>
collectMetricsByRegion() {
+ return collectMetricsByRegion(true);
+ }
+
+ /**
+ * Get values grouped by each region scanned. If reset is true, will reset
all the region level
+ * scan metrics counters back to 0.
+ * @param reset whether to reset region level scan metric counters to 0.
+ * @return A Map of region -> (Map of metric name -> Long) for metrics
+ */
+ public Map<ScanMetricsRegionInfo, Map<String, Long>>
collectMetricsByRegion(boolean reset) {
// Create a builder
- ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
- for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
- long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
- builder.put(e.getKey(), value);
+ ImmutableMap.Builder<ScanMetricsRegionInfo, Map<String, Long>> builder =
ImmutableMap.builder();
+ for (RegionScanMetricsData regionScanMetricsData :
this.regionScanMetricsData) {
+ if (
+ regionScanMetricsData.getScanMetricsRegionInfo()
+ == ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO
+ ) {
+ continue;
+ }
+ builder.put(regionScanMetricsData.getScanMetricsRegionInfo(),
+ regionScanMetricsData.collectMetrics(reset));
}
- // Build the immutable map so that people can't mess around with it.
return builder.build();
}
+
+ @Override
+ public String toString() {
+ return counters + "," +
regionScanMetricsData.stream().map(RegionScanMetricsData::toString)
+ .collect(Collectors.joining(","));
+ }
+
+ /**
+ * Call this method after calling {@link #moveToNextRegion()} to populate
server name and encoded
+ * region name details for the region being scanned and for which metrics
are being collected at
+ * the moment.
+ */
+ public void initScanMetricsRegionInfo(String encodedRegionName, ServerName
serverName) {
+ currentRegionScanMetricsData.initScanMetricsRegionInfo(encodedRegionName,
serverName);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 144c01de874..df99fd40338 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.mob.MobFileCache;
@@ -90,6 +91,12 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
initScanMetrics(scan);
} else {
this.scanMetrics = scanMetrics;
+ setIsScanMetricsByRegionEnabled(scan.isScanMetricsByRegionEnabled());
+ }
+ if (isScanMetricsByRegionEnabled()) {
+ this.scanMetrics.moveToNextRegion();
+
this.scanMetrics.initScanMetricsRegionInfo(region.getRegionInfo().getEncodedName(),
null);
+ // The server name will be null in scan metrics as this is a client side
region scanner
}
region.startRegionOperation();
}
@@ -110,8 +117,8 @@ public class ClientSideRegionScanner extends
AbstractClientScanner {
for (Cell cell : values) {
resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
}
- this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
- this.scanMetrics.countOfRowsScanned.incrementAndGet();
+ this.scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME,
resultSize);
+
this.scanMetrics.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME,
1);
}
return result;
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
index c2bc0f08d10..41bd0bd988b 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -187,7 +188,7 @@ public class TableSnapshotScanner extends
AbstractClientScanner {
currentRegionScanner =
new ClientSideRegionScanner(conf, fs, restoreDir, htd, hri, scan,
scanMetrics);
if (this.scanMetrics != null) {
- this.scanMetrics.countOfRegions.incrementAndGet();
+
this.scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d97318337e0..fa0c2fd3ff1 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@@ -3516,10 +3517,12 @@ public class RSRpcServices extends
HBaseRpcServicesBase<HRegionServer>
if (trackMetrics) {
// rather than increment yet another counter in StoreScanner, just
set the value here
// from block size progress before writing into the response
- scannerContext.getMetrics().countOfBlockBytesScanned
- .set(scannerContext.getBlockSizeProgress());
+ scannerContext.getMetrics().setCounter(
+ ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
+ scannerContext.getBlockSizeProgress());
if (rpcCall != null) {
-
scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
+
scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
+ rpcCall.getFsReadTime());
}
Map<String, Long> metrics =
scannerContext.getMetrics().getMetricsMap();
ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index d7e4bb52a75..fed3bd1f992 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.ClientInternalHelper;
import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
@@ -645,7 +646,8 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
return;
}
- scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
+ scannerContext.getMetrics()
+
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
}
private void incrementCountOfRowsScannedMetric(ScannerContext
scannerContext) {
@@ -653,7 +655,8 @@ public class RegionScannerImpl implements RegionScanner,
Shipper, RpcCallback {
return;
}
- scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
+ scannerContext.getMetrics()
+
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
}
/** Returns true when the joined heap may have data for the current row */
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
index 0f7258f3435..6678e94b7e0 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -17,19 +17,28 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -93,12 +102,12 @@ public class TestAsyncTableScanMetrics {
@BeforeClass
public static void setUp() throws Exception {
UTIL.startMiniCluster(3);
- // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
- // scan are forced to hit all the regions.
+ // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*"
and "zzz*" so that
+ // scan hits all the region and not all rows lie in a single region
try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
- table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ,
VALUE),
- new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE),
- new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE)));
+ table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ,
VALUE),
+ new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
}
CONN =
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
@@ -141,25 +150,101 @@ public class TestAsyncTableScanMetrics {
}
@Test
- public void testNoScanMetrics() throws Exception {
+ public void testScanMetricsDisabled() throws Exception {
Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
assertEquals(3, pair.getFirst().size());
+ // Assert no scan metrics
assertNull(pair.getSecond());
}
@Test
- public void testScanMetrics() throws Exception {
- Pair<List<Result>, ScanMetrics> pair = method.scan(new
Scan().setScanMetricsEnabled(true));
+ public void testScanMetricsWithScanMetricsByRegionDisabled() throws
Exception {
+ Scan scan = new Scan();
+ scan.setScanMetricsEnabled(true);
+ Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
List<Result> results = pair.getFirst();
assertEquals(3, results.size());
- long bytes = results.stream().flatMap(r ->
Arrays.asList(r.rawCells()).stream())
- .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
+ long bytes = getBytesOfResults(results);
+ ScanMetrics scanMetrics = pair.getSecond();
+ assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+ assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+ assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
+ // Assert scan metrics have not been collected by region
+ assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+ }
+
+ @Test
+ public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+ Scan scan = new Scan();
+ scan.withStartRow(Bytes.toBytes("zzz1"), true);
+ scan.withStopRow(Bytes.toBytes("zzz1"), true);
+ scan.setEnableScanMetricsByRegion(true);
+ Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+ List<Result> results = pair.getFirst();
+ assertEquals(1, results.size());
+ long bytes = getBytesOfResults(results);
+ ScanMetrics scanMetrics = pair.getSecond();
+ assertEquals(1, scanMetrics.countOfRegions.get());
+ assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+ assertEquals(1, scanMetrics.countOfRPCcalls.get());
+ // Assert scan metrics by region were collected for the region scanned
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo smri = entry.getKey();
+ Map<String, Long> metrics = entry.getValue();
+ assertNotNull(smri.getServerName());
+ assertNotNull(smri.getEncodedRegionName());
+ // Assert overall scan metrics and scan metrics by region should be
equal as only 1 region
+ // was scanned.
+ assertEquals(scanMetrics.getMetricsMap(false), metrics);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+ Scan scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+ List<Result> results = pair.getFirst();
+ assertEquals(3, results.size());
+ long bytes = getBytesOfResults(results);
ScanMetrics scanMetrics = pair.getSecond();
+ Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+ assertEquals(NUM_REGIONS, (long)
overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+ assertEquals(bytes, (long)
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+ assertEquals(NUM_REGIONS, (long)
overallMetrics.get(RPC_CALLS_METRIC_NAME));
assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
- // also assert a server side metric to ensure that we have published them
into the client side
- // metrics.
- assertEquals(3, scanMetrics.countOfRowsScanned.get());
+ // Assert scan metrics by region were collected for the region scanned
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+ int rowsScannedAcrossAllRegions = 0;
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo smri = entry.getKey();
+ Map<String, Long> perRegionMetrics = entry.getValue();
+ assertNotNull(smri.getServerName());
+ assertNotNull(smri.getEncodedRegionName());
+ assertEquals(1, (long)
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+ if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+ bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+ assertEquals(bytes, (long)
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ rowsScannedAcrossAllRegions++;
+ } else {
+ assertEquals(0, (long)
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ assertEquals(0, (long)
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ }
+ }
+ assertEquals(3, rowsScannedAcrossAllRegions);
+ }
+
+ static long getBytesOfResults(List<Result> results) {
+ return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
+ .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
}
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
new file mode 100644
index 00000000000..6a355b08365
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static
org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanMetricsWithScannerSuspending {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+
HBaseClassTestRule.forClass(TestAsyncTableScanMetricsWithScannerSuspending.class);
+
+ private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+ private static final TableName TABLE_NAME =
+
TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName());
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static AsyncConnection CONN;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(1);
+ // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*"
and "zzz*" so that
+ // scan hits all the region and not all rows lie in a single region
+ try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+ table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ,
VALUE),
+ new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+ }
+ CONN =
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithScannerSuspending() throws Exception {
+ // Setup scan
+ Scan scan = new Scan();
+ scan.withStartRow(Bytes.toBytes("xxx1"), true);
+ scan.withStopRow(Bytes.toBytes("zzz1"), true);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(1);
+
+ // Prepare scanner
+ final AtomicInteger rowsReadCounter = new AtomicInteger(0);
+ AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME,
scan, 1) {
+ @Override
+ public void onNext(Result[] results, ScanController controller) {
+ rowsReadCounter.addAndGet(results.length);
+ super.onNext(results, controller);
+ }
+ };
+
+ // Do the scan so that rows get loaded in the scanner (consumer)
+ CONN.getTable(TABLE_NAME).scan(scan, scanner);
+
+ List<Result> results = new ArrayList<>();
+ int expectedTotalRows = 3;
+ // Assert that only 1 row has been loaded so far as maxCacheSize is set to
1 byte
+ for (int i = 1; i <= expectedTotalRows; i++) {
+ UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return scanner.isSuspended();
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "The given scanner has been suspended in time";
+ }
+ });
+ assertTrue(scanner.isSuspended());
+ assertEquals(i, rowsReadCounter.get());
+ results.add(scanner.next());
+ }
+ Assert.assertNull(scanner.next());
+
+ // Assert on overall scan metrics and scan metrics by region
+ ScanMetrics scanMetrics = scanner.getScanMetrics();
+ // Assert on overall scan metrics
+ long bytes = getBytesOfResults(results);
+ Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+ assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+ assertEquals(bytes, (long)
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ // 1 Extra RPC call per region where no row is returned but
moreResultsInRegion is set to false
+ assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
+ // Assert scan metrics by region were collected for the region scanned
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ assertEquals(3, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo smri = entry.getKey();
+ Map<String, Long> perRegionMetrics = entry.getValue();
+ assertNotNull(smri.getServerName());
+ assertNotNull(smri.getEncodedRegionName());
+ assertEquals(1, (long)
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+ assertEquals(1, (long)
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+ assertEquals(bytes, (long)
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+ assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME));
+ }
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
index 6da74bf031d..253e61f995c 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -37,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -179,6 +184,86 @@ public class TestClientSideRegionScanner {
}
}
+ @Test
+ public void testScanMetricsDisabled() throws IOException {
+ Configuration copyConf = new Configuration(conf);
+ Scan scan = new Scan();
+ try (ClientSideRegionScanner clientSideRegionScanner =
+ new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan,
null)) {
+ clientSideRegionScanner.next();
+ assertNull(clientSideRegionScanner.getScanMetrics());
+ }
+ }
+
+ private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics
scanMetrics)
+ throws IOException {
+ Configuration copyConf = new Configuration(conf);
+ Scan scan = new Scan();
+ scan.setScanMetricsEnabled(true);
+ TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+ try (ClientSideRegionScanner clientSideRegionScanner =
+ new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan,
scanMetrics)) {
+ clientSideRegionScanner.next();
+ ScanMetrics scanMetricsFromScanner =
clientSideRegionScanner.getScanMetrics();
+ assertNotNull(scanMetricsFromScanner);
+ if (scanMetrics != null) {
+ Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+ }
+ Map<String, Long> metricsMap =
scanMetricsFromScanner.getMetricsMap(false);
+ Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME)
> 0);
+
Assert.assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty());
+ }
+ }
+
+ @Test
+ public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled()
throws IOException {
+ testScanMetricsWithScanMetricsByRegionDisabled(null);
+ }
+
+ @Test
+ public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws
IOException {
+ testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics());
+ }
+
+ private void testScanMetricByRegion(ScanMetrics scanMetrics) throws
IOException {
+ Configuration copyConf = new Configuration(conf);
+ Scan scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+ try (ClientSideRegionScanner clientSideRegionScanner =
+ new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan,
scanMetrics)) {
+ clientSideRegionScanner.next();
+ ScanMetrics scanMetricsFromScanner =
clientSideRegionScanner.getScanMetrics();
+ assertNotNull(scanMetricsFromScanner);
+ if (scanMetrics != null) {
+ Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+ }
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetricsFromScanner.collectMetricsByRegion();
+ Assert.assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertEquals(hri.getEncodedName(),
scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNull(scanMetricsRegionInfo.getServerName());
+
Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
+ Assert.assertEquals((long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME),
+ scanMetricsFromScanner.countOfRowsScanned.get());
+ }
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws
IOException {
+ testScanMetricByRegion(null);
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithScanMetricsAsInput() throws
IOException {
+ testScanMetricByRegion(new ScanMetrics());
+ }
+
private static Put createPut(int rowAsInt) {
byte[] row = Bytes.toBytes(rowAsInt);
Put put = new Put(row);
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 02d80eb57f5..7f1dc69af2a 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+
import com.codahale.metrics.Counter;
import java.io.IOException;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -34,9 +38,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -89,6 +95,7 @@ public class TestReplicasClient {
private static final byte[] f = HConstants.CATALOG_FAMILY;
private final static int REFRESH_PERIOD = 1000;
+ private static ServerName rsServerName;
/**
* This copro is used to synchronize the tests.
@@ -215,6 +222,9 @@ public class TestReplicasClient {
Configuration c = new Configuration(HTU.getConfiguration());
c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
LOG.info("Master has stopped");
+
+ rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName();
+ Assert.assertNotNull(rsServerName);
}
@AfterClass
@@ -615,4 +625,69 @@ public class TestReplicasClient {
closeRegion(hriSecondary);
}
}
+
+ @Test
+ public void testScanMetricsByRegion() throws Exception {
+ byte[] b1 = Bytes.toBytes("testScanMetricsByRegion");
+ openRegion(hriSecondary);
+
+ try {
+ Put p = new Put(b1);
+ p.addColumn(f, b1, b1);
+ table.put(p);
+ LOG.info("Put done");
+ flushRegion(hriPrimary);
+
+ // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to
secondary replica
+ Thread.sleep(2 * REFRESH_PERIOD);
+
+ // Explicitly read replica 0
+ Scan scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ scan.withStartRow(b1, true);
+ scan.withStopRow(b1, true);
+ // Assert row was read from primary replica along with asserting scan
metrics by region
+ assertScanMetrics(scan, hriPrimary, false);
+ LOG.info("Scanned primary replica");
+
+ // Read from region replica
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
+ scan = new Scan();
+ scan.setEnableScanMetricsByRegion(true);
+ scan.withStartRow(b1, true);
+ scan.withStopRow(b1, true);
+ scan.setConsistency(Consistency.TIMELINE);
+ // Assert row was read from secondary replica along with asserting scan
metrics by region
+ assertScanMetrics(scan, hriSecondary, true);
+ LOG.info("Scanned secondary replica ");
+ } finally {
+ SlowMeCopro.getPrimaryCdl().get().countDown();
+ Delete d = new Delete(b1);
+ table.delete(d);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean
isStale)
+ throws IOException {
+ try (ResultScanner rs = table.getScanner(scan);) {
+ for (Result r : rs) {
+ Assert.assertEquals(isStale, r.isStale());
+ Assert.assertFalse(r.isEmpty());
+ }
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ rs.getScanMetrics().collectMetricsByRegion(false);
+ Assert.assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metrics = entry.getValue();
+ Assert.assertEquals(rsServerName,
scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(regionInfo.getEncodedName(),
+ scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertEquals(1, (long)
metrics.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(1, (long)
metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+ }
}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
new file mode 100644
index 00000000000..36935511734
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestScanAttributes {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestScanAttributes.class);
+
+ @Test
+ public void testCoEnableAndCoDisableScanMetricsAndScanMetricsByRegion() {
+ Scan scan = new Scan();
+ Assert.assertFalse(scan.isScanMetricsEnabled());
+ Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+
+ // Assert enabling scan metrics by region enables scan metrics also
+ scan.setEnableScanMetricsByRegion(true);
+ Assert.assertTrue(scan.isScanMetricsEnabled());
+ Assert.assertTrue(scan.isScanMetricsByRegionEnabled());
+
+ // Assert disabling scan metrics disables scan metrics by region
+ scan.setScanMetricsEnabled(false);
+ Assert.assertFalse(scan.isScanMetricsEnabled());
+ Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
new file mode 100644
index 00000000000..bcbf625f1e1
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestTableScanMetrics extends FromClientSideBase {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestTableScanMetrics.class);
+
+ private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+
+ private static final TableName TABLE_NAME =
+ TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
+
+ private static final byte[] CF = Bytes.toBytes("cf");
+
+ private static final byte[] CQ = Bytes.toBytes("cq");
+
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ private static final Random RAND = new Random(11);
+
+ private static int NUM_REGIONS;
+
+ private static Connection CONN;
+
+ @Parameters(name = "{index}: scanner={0}")
+ public static List<Object[]> params() {
+ return Arrays.asList(new Object[] { "ForwardScanner", new Scan() },
+ new Object[] { "ReverseScanner", new Scan().setReversed(true) });
+ }
+
+ @Parameter(0)
+ public String scannerName;
+
+ @Parameter(1)
+ public Scan originalScan;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ // Start the minicluster
+ TEST_UTIL.startMiniCluster(2);
+ // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*"
and "zzz*" so that
+ // scan hits all the region and not all rows lie in a single region
+ try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+ table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ,
VALUE),
+ new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+ new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+ }
+ CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+ NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws
IOException {
+ Scan scan = new Scan(originalScan);
+ if (originalScan.isReversed()) {
+ scan.withStartRow(largerRow, true);
+ scan.withStopRow(smallerRow, true);
+ } else {
+ scan.withStartRow(smallerRow, true);
+ scan.withStopRow(largerRow, true);
+ }
+ return scan;
+ }
+
+ private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int
expectedCount)
+ throws IOException {
+ int countOfRows = 0;
+ ScanMetrics scanMetrics;
+ try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner =
table.getScanner(scan)) {
+ for (Result result : scanner) {
+ Assert.assertFalse(result.isEmpty());
+ countOfRows++;
+ }
+ scanMetrics = scanner.getScanMetrics();
+ }
+ Assert.assertEquals(expectedCount, countOfRows);
+ return scanMetrics;
+ }
+
+ @Test
+ public void testScanMetricsDisabled() throws Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
+ Assert.assertNull(scanMetrics);
+ }
+
+ @Test
+ public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception
{
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ scan.setScanMetricsEnabled(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+ // The test setup is such that we have 1 row per region in the scan range
+ Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+ }
+
+ @Test
+ public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws
Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ scan.setScanMetricsEnabled(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ // By default counters are collected with reset as true
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+ Assert.assertEquals(expectedRowsScanned, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ // Subsequent call to get scan metrics map should show all counters as 0
+ Assert.assertEquals(0, scanMetrics.countOfRegions.get());
+ Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get());
+ }
+
+ @Test
+ public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
+ scan.setEnableScanMetricsByRegion(true);
+ int expectedRowsScanned = 1;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+ Assert.assertEquals(expectedRowsScanned, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ // As we are scanning single row so, overall scan metrics will match per
region scan metrics
+ Assert.assertEquals(expectedRowsScanned, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(expectedRowsScanned,
+ (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+ Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ scan.setEnableScanMetricsByRegion(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+ Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+ Assert.assertEquals(expectedRowsScanned,
scanMetrics.countOfRowsScanned.get());
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion(false);
+ Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+ int rowsScannedAcrossAllRegions = 0;
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+ rowsScannedAcrossAllRegions++;
+ } else {
+ assertEquals(0, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+ Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
+ }
+
+ @Test
+ public void testScanMetricsByRegionReset() throws Exception {
+ Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+ scan.setEnableScanMetricsByRegion(true);
+ int expectedRowsScanned = 3;
+ ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan,
expectedRowsScanned);
+ Assert.assertNotNull(scanMetrics);
+
+ // Retrieve scan metrics by region as a map and reset
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ // We scan 1 row per region
+ Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(1, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+
+ // Scan metrics have already been reset and now all counters should be 0
+ scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
+ // Size of map should be same as earlier
+ Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ // Counters should have been reset to 0
+ Assert.assertEquals(0, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(0, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+
+ @Test
+ public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws
Exception {
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(2);
+ TableName tableName =
TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
+ + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ Map<ScanMetricsRegionInfo, Map<String, Long>>
concurrentScanMetricsByRegion = new HashMap<>();
+
+ // Trigger two concurrent threads one of which scans the table and other
periodically
+ // collects the scan metrics (along with resetting the counters to 0).
+ Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setCaching(2);
+ try (ResultScanner rs = table.getScanner(scan)) {
+ ScanMetrics scanMetrics = rs.getScanMetrics();
+ AtomicInteger rowsScanned = new AtomicInteger(0);
+ CountDownLatch latch = new CountDownLatch(1);
+ Runnable tableScanner = new Runnable() {
+ public void run() {
+ for (Result r : rs) {
+ Assert.assertFalse(r.isEmpty());
+ rowsScanned.incrementAndGet();
+ }
+ latch.countDown();
+ }
+ };
+ Runnable metricsCollector =
+ getPeriodicScanMetricsCollector(scanMetrics,
concurrentScanMetricsByRegion, latch);
+ executor.execute(tableScanner);
+ executor.execute(metricsCollector);
+ latch.await();
+ // Merge leftover scan metrics
+ mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
+ concurrentScanMetricsByRegion);
+ Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned.get());
+ }
+
+ Map<ScanMetricsRegionInfo, Map<String, Long>>
expectedScanMetricsByRegion;
+
+ // Collect scan metrics by region from single thread. Assert that
concurrent scan
+ // and metrics collection works as expected.
+ scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setCaching(2);
+ try (ResultScanner rs = table.getScanner(scan)) {
+ ScanMetrics scanMetrics = rs.getScanMetrics();
+ int rowsScanned = 0;
+ for (Result r : rs) {
+ Assert.assertFalse(r.isEmpty());
+ rowsScanned++;
+ }
+ Assert.assertEquals(HBaseTestingUtil.ROWS.length, rowsScanned);
+ expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
expectedScanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ // Each region will have 26 * 26 + 26 + 1 rows except last region
which will have 1 row
+ long rowsScannedFromMetrics =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ Assert.assertTrue(
+ rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26
+ 26 + 1));
+ }
+ }
+
+ // Assert on scan metrics by region
+ Assert.assertEquals(expectedScanMetricsByRegion,
concurrentScanMetricsByRegion);
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithRegionMove() throws Exception {
+ TableName tableName = TableName.valueOf(
+ TestTableScanMetrics.class.getSimpleName() +
"testScanMetricsByRegionWithRegionMove");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ // Scan 2 regions with start row keys: bbb and ccc
+ byte[] bbb = Bytes.toBytes("bbb");
+ byte[] ccc = Bytes.toBytes("ccc");
+ byte[] ddc = Bytes.toBytes("ddc");
+ long expectedCountOfRowsScannedInMovedRegion = 0;
+ // ROWS is the data loaded by loadTable()
+ for (byte[] row : HBaseTestingUtil.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
+ expectedCountOfRowsScannedInMovedRegion++;
+ }
+ }
+ byte[] movedRegion = null;
+ ScanMetrics scanMetrics;
+
+ // Initialize scan with maxResultSize as size of 50 rows.
+ Scan scan = generateScan(bbb, ddc);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(8000);
+
+ try (ResultScanner rs = table.getScanner(scan)) {
+ boolean isFirstScanOfRegion = true;
+ for (Result r : rs) {
+ byte[] row = r.getRow();
+ if (isFirstScanOfRegion) {
+ movedRegion = moveRegion(tableName, row);
+ isFirstScanOfRegion = false;
+ }
+ }
+ Assert.assertNotNull(movedRegion);
+
+ scanMetrics = rs.getScanMetrics();
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ long actualCountOfRowsScannedInMovedRegion = 0;
+ Set<ServerName> serversForMovedRegion = new HashSet<>();
+
+ // 2 regions scanned with two entries for first region as it moved in
b/w scan
+ Assert.assertEquals(3, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ if
(scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion)))
{
+ long rowsScanned =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ actualCountOfRowsScannedInMovedRegion += rowsScanned;
+ serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
+
+ Assert.assertEquals(1, (long)
metricsMap.get(RPC_RETRIES_METRIC_NAME));
+ }
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion,
+ actualCountOfRowsScannedInMovedRegion);
+ Assert.assertEquals(2, serversForMovedRegion.size());
+ }
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithRegionSplit() throws Exception {
+ TableName tableName = TableName.valueOf(
+ TestTableScanMetrics.class.getSimpleName() +
"testScanMetricsByRegionWithRegionSplit");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ // Scan 1 region with start row key: bbb
+ byte[] bbb = Bytes.toBytes("bbb");
+ byte[] bmw = Bytes.toBytes("bmw");
+ byte[] ccb = Bytes.toBytes("ccb");
+ long expectedCountOfRowsScannedInRegion = 0;
+ // ROWS is the data loaded by loadTable()
+ for (byte[] row : HBaseTestingUtil.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
+ expectedCountOfRowsScannedInRegion++;
+ }
+ }
+ ScanMetrics scanMetrics;
+ Set<String> expectedSplitRegionRes = new HashSet<>();
+
+ // Initialize scan
+ Scan scan = generateScan(bbb, ccb);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(8000);
+
+ try (ResultScanner rs = table.getScanner(scan)) {
+ boolean isFirstScanOfRegion = true;
+ for (Result r : rs) {
+ if (isFirstScanOfRegion) {
+ splitRegion(tableName, bbb, bmw)
+ .forEach(region ->
expectedSplitRegionRes.add(Bytes.toString(region)));
+ isFirstScanOfRegion = false;
+ }
+ }
+
+ scanMetrics = rs.getScanMetrics();
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+
+ long actualCountOfRowsScannedInRegion = 0;
+ long rpcRetiesCount = 0;
+ Set<String> splitRegionRes = new HashSet<>();
+
+ // 1 entry each for parent and two child regions
+ Assert.assertEquals(3, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ long rowsScanned =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ actualCountOfRowsScannedInRegion += rowsScanned;
+ splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+
+ if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
+ rpcRetiesCount++;
+ }
+
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ Assert.assertEquals(expectedCountOfRowsScannedInRegion,
actualCountOfRowsScannedInRegion);
+ Assert.assertEquals(2, rpcRetiesCount);
+ Assert.assertEquals(expectedSplitRegionRes, splitRegionRes);
+ }
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionWithRegionMerge() throws Exception {
+ TableName tableName = TableName.valueOf(
+ TestTableScanMetrics.class.getSimpleName() +
"testScanMetricsByRegionWithRegionMerge");
+ try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+ TEST_UTIL.loadTable(table, CF);
+
+ // Scan 2 regions with start row keys: bbb and ccc
+ byte[] bbb = Bytes.toBytes("bbb");
+ byte[] ccc = Bytes.toBytes("ccc");
+ byte[] ddc = Bytes.toBytes("ddc");
+ long expectedCountOfRowsScannedInRegions = 0;
+ // ROWS is the data loaded by loadTable()
+ for (byte[] row : HBaseTestingUtil.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
+ expectedCountOfRowsScannedInRegions++;
+ }
+ }
+ ScanMetrics scanMetrics;
+ Set<String> expectedMergeRegionsRes = new HashSet<>();
+ String mergedRegionEncodedName = null;
+
+ // Initialize scan
+ Scan scan = generateScan(bbb, ddc);
+ scan.setEnableScanMetricsByRegion(true);
+ scan.setMaxResultSize(8000);
+
+ try (ResultScanner rs = table.getScanner(scan)) {
+ boolean isFirstScanOfRegion = true;
+ for (Result r : rs) {
+ if (isFirstScanOfRegion) {
+ List<byte[]> out = mergeRegions(tableName, bbb, ccc);
+ // Entry with index 2 is the encoded region name of merged region
+ mergedRegionEncodedName = Bytes.toString(out.get(2));
+ out.forEach(region ->
expectedMergeRegionsRes.add(Bytes.toString(region)));
+ isFirstScanOfRegion = false;
+ }
+ }
+
+ scanMetrics = rs.getScanMetrics();
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ long actualCountOfRowsScannedInRegions = 0;
+ Set<String> mergeRegionsRes = new HashSet<>();
+ boolean containsMergedRegionInScanMetrics = false;
+
+ // 1 entry each for old region from which first row was scanned and
new merged region
+ Assert.assertEquals(2, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ long rowsScanned =
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+ actualCountOfRowsScannedInRegions += rowsScanned;
+ mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+ if
(scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
+ containsMergedRegionInScanMetrics = true;
+ }
+
+ Assert.assertEquals(1, (long)
metricsMap.get(RPC_RETRIES_METRIC_NAME));
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ Assert.assertEquals(expectedCountOfRowsScannedInRegions,
actualCountOfRowsScannedInRegions);
+
Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
+ Assert.assertTrue(containsMergedRegionInScanMetrics);
+ }
+ } finally {
+ TEST_UTIL.deleteTable(tableName);
+ }
+ }
+
+ private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
+ Map<ScanMetricsRegionInfo, Map<String, Long>>
scanMetricsByRegionCollection,
+ CountDownLatch latch) {
+ return new Runnable() {
+ public void run() {
+ try {
+ while (latch.getCount() > 0) {
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ mergeScanMetricsByRegion(scanMetricsByRegion,
scanMetricsByRegionCollection);
+ Thread.sleep(RAND.nextInt(10));
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String,
Long>> srcMap,
+ Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
srcMap.entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ if (dstMap.containsKey(scanMetricsRegionInfo)) {
+ Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
+ for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
+ String metricName = metricEntry.getKey();
+ Long existingValue = dstMetricsMap.get(metricName);
+ Long newValue = metricEntry.getValue();
+ dstMetricsMap.put(metricName, existingValue + newValue);
+ }
+ } else {
+ dstMap.put(scanMetricsRegionInfo, metricsMap);
+ }
+ }
+ }
+
+ /**
+ * Moves the region with start row key from its original region server to
some other region
+ * server. This is a synchronous method.
+ * @param tableName Table name of region to be moved belongs.
+ * @param startRow Start row key of the region to be moved.
+ * @return Encoded region name of the region which was moved.
+ */
+ private byte[] moveRegion(TableName tableName, byte[] startRow) throws
IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+ HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
+ byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
+ ServerName initialServerName = loc.getServerName();
+
+ admin.move(encodedRegionName);
+
+ ServerName finalServerName = regionLocator.getRegionLocation(startRow,
true).getServerName();
+
+ // Assert that region actually moved
+ Assert.assertNotEquals(initialServerName, finalServerName);
+ return encodedRegionName;
+ }
+
+ /**
+ * Splits the region with start row key at the split key provided. This is a
synchronous method.
+ * @param tableName Table name of region to be split.
+ * @param startRow Start row key of the region to be split.
+ * @param splitKey Split key for splitting the region.
+ * @return List of encoded region names with first element being parent
region followed by two
+ * child regions.
+ */
+ private List<byte[]> splitRegion(TableName tableName, byte[] startRow,
byte[] splitKey)
+ throws IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+ HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
+ byte[] initialEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ ServerName initialTopServerName = topLoc.getServerName();
+ HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey,
true);
+ byte[] initialEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+ ServerName initialBottomServerName = bottomLoc.getServerName();
+
+ // Assert region is ready for split
+ Assert.assertEquals(initialTopServerName, initialBottomServerName);
+ Assert.assertEquals(initialEncodedTopRegionName,
initialEncodedBottomRegionName);
+
+ FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName,
splitKey));
+
+ topLoc = regionLocator.getRegionLocation(startRow, true);
+ byte[] finalEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ bottomLoc = regionLocator.getRegionLocation(splitKey, true);
+ byte[] finalEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+ // Assert that region split is complete
+ Assert.assertNotEquals(finalEncodedTopRegionName,
finalEncodedBottomRegionName);
+ Assert.assertNotEquals(initialEncodedTopRegionName,
finalEncodedBottomRegionName);
+ Assert.assertNotEquals(initialEncodedBottomRegionName,
finalEncodedTopRegionName);
+
+ return Arrays.asList(initialEncodedTopRegionName,
finalEncodedTopRegionName,
+ finalEncodedBottomRegionName);
+ }
+
+ /**
+ * Merges two regions with the start row key as topRegion and bottomRegion.
Ensures that the
+ * regions to be merged are adjacent regions. This is a synchronous method.
+ * @param tableName Table name of regions to be merged.
+ * @param topRegion Start row key of first region for merging.
+ * @param bottomRegion Start row key of second region for merging.
+ * @return List of encoded region names with first two elements being
original regions followed by
+ * the merged region.
+ */
+ private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion,
byte[] bottomRegion)
+ throws IOException {
+ Admin admin = TEST_UTIL.getAdmin();
+ RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+ HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
+ byte[] initialEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ String initialTopRegionEndKey =
Bytes.toString(topLoc.getRegion().getEndKey());
+ HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion,
true);
+ byte[] initialEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+ String initialBottomRegionStartKey =
Bytes.toString(bottomLoc.getRegion().getStartKey());
+
+ // Assert that regions are ready to be merged
+ Assert.assertNotEquals(initialEncodedTopRegionName,
initialEncodedBottomRegionName);
+ Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
+
+ FutureUtils.get(admin.mergeRegionsAsync(
+ new byte[][] { initialEncodedTopRegionName,
initialEncodedBottomRegionName }, false));
+
+ topLoc = regionLocator.getRegionLocation(topRegion, true);
+ byte[] finalEncodedTopRegionName =
topLoc.getRegion().getEncodedNameAsBytes();
+ bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
+ byte[] finalEncodedBottomRegionName =
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+ // Assert regions have been merges successfully
+ Assert.assertEquals(finalEncodedTopRegionName,
finalEncodedBottomRegionName);
+ Assert.assertNotEquals(initialEncodedTopRegionName,
finalEncodedTopRegionName);
+ Assert.assertNotEquals(initialEncodedBottomRegionName,
finalEncodedTopRegionName);
+
+ return Arrays.asList(initialEncodedTopRegionName,
initialEncodedBottomRegionName,
+ finalEncodedTopRegionName);
+ }
+}
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 4f921540a17..621ecc08778 100644
---
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.client;
+import static
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -31,6 +35,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.StartTestingClusterOption;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -244,6 +250,84 @@ public class TestTableSnapshotScanner {
testScanner(UTIL, "testWithMultiRegion", 20, true);
}
+ private ScanMetrics createTableSnapshotScannerAndGetScanMetrics(boolean
enableScanMetrics,
+ boolean enableScanMetricsByRegion, byte[] endKey) throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName() + "_TABLE");
+ String snapshotName = name.getMethodName() + "_SNAPSHOT";
+ try {
+ createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
+ Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+ Scan scan = new Scan().withStartRow(bbb).withStopRow(endKey);
+ scan.setScanMetricsEnabled(enableScanMetrics);
+ scan.setEnableScanMetricsByRegion(enableScanMetricsByRegion);
+ Configuration conf = UTIL.getConfiguration();
+
+ TableSnapshotScanner snapshotScanner =
+ new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+ verifyScanner(snapshotScanner, bbb, endKey);
+ return snapshotScanner.getScanMetrics();
+ } finally {
+ UTIL.getAdmin().deleteSnapshot(snapshotName);
+ UTIL.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testScanMetricsDisabled() throws Exception {
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(false, false, yyy);
+ Assert.assertNull(scanMetrics);
+ }
+
+ @Test
+ public void testScanMetricsWithScanMetricsByRegionDisabled() throws
Exception {
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(true, false, yyy);
+ Assert.assertNotNull(scanMetrics);
+ int rowsScanned = 0;
+ for (byte[] row : HBaseTestingUtil.ROWS) {
+ if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, yyy) < 0) {
+ rowsScanned++;
+ }
+ }
+ Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+ Assert.assertEquals(rowsScanned, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+
+ @Test
+ public void testScanMetricsByRegionForSingleRegion() throws Exception {
+ // Scan single row with row key bbb
+ byte[] bbc = Bytes.toBytes("bbc");
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(true, true, bbc);
+ Assert.assertNotNull(scanMetrics);
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ Assert.assertEquals(1, scanMetricsByRegion.size());
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ Assert.assertEquals(1, (long)
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+ }
+ }
+
+ @Test
+ public void testScanMetricsByRegionForMultiRegion() throws Exception {
+ ScanMetrics scanMetrics =
createTableSnapshotScannerAndGetScanMetrics(true, true, yyy);
+ Assert.assertNotNull(scanMetrics);
+ Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+ scanMetrics.collectMetricsByRegion();
+ for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry :
scanMetricsByRegion
+ .entrySet()) {
+ ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+ Map<String, Long> metricsMap = entry.getValue();
+ Assert.assertNull(scanMetricsRegionInfo.getServerName());
+ Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+ Assert.assertEquals(1, (long)
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+ }
+ }
+
@Test
public void testScannerWithRestoreScanner() throws Exception {
TableName tableName = TableName.valueOf("testScanner");