This is an automated email from the ASF dual-hosted git repository.
sanjeet pushed a commit to branch 5.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.3 by this push:
new 350f23b804 PHOENIX-7704: Integrate HBase's new Scan Latency Metrics
(#2333)
350f23b804 is described below
commit 350f23b8048ced4639d9dd4edc870682868e6f98
Author: sanjeet006py <[email protected]>
AuthorDate: Fri Dec 19 10:54:03 2025 +0530
PHOENIX-7704: Integrate HBase's new Scan Latency Metrics (#2333)
---
.../phoenix/iterate/ScanningResultIterator.java | 15 +
.../org/apache/phoenix/monitoring/MetricType.java | 12 +
.../phoenix/monitoring/ScanMetricsHolder.java | 50 +-
.../phoenix/coprocessor/DataTableScanMetrics.java | 124 +++++
.../UncoveredGlobalIndexRegionScanner.java | 73 +++
.../apache/phoenix/index/GlobalIndexChecker.java | 18 +
.../phoenix/monitoring/HBaseScanMetricsIT.java | 542 +++++++++++++++++++++
.../phoenix/monitoring/PhoenixMetricsIT.java | 11 +-
.../phoenix/compat/hbase/CompatScanMetrics.java | 79 +++
.../CompatThreadLocalServerSideScanMetrics.java | 39 ++
.../phoenix/compat/hbase/CompatScanMetrics.java | 79 +++
.../CompatThreadLocalServerSideScanMetrics.java | 39 ++
.../phoenix/compat/hbase/CompatScanMetrics.java | 79 +++
.../CompatThreadLocalServerSideScanMetrics.java | 39 ++
.../phoenix/compat/hbase/CompatScanMetrics.java | 94 ++++
.../CompatThreadLocalServerSideScanMetrics.java | 46 ++
16 files changed, 1337 insertions(+), 2 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index dbca9f8b96..420d0d570f 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
import
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
@@ -163,6 +164,20 @@ public class ScanningResultIterator implements
ResultIterator {
changeMetric(scanMetricsHolder.getCountOfBytesScanned(),
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter);
+ changeMetric(scanMetricsHolder.getFsReadTime(),
+ CompatScanMetrics.getFsReadTime(scanMetricsMap));
+ changeMetric(scanMetricsHolder.getCountOfBytesReadFromFS(),
+ CompatScanMetrics.getBytesReadFromFs(scanMetricsMap));
+ changeMetric(scanMetricsHolder.getCountOfBytesReadFromMemstore(),
+ CompatScanMetrics.getBytesReadFromMemstore(scanMetricsMap));
+ changeMetric(scanMetricsHolder.getCountOfBytesReadFromBlockcache(),
+ CompatScanMetrics.getBytesReadFromBlockCache(scanMetricsMap));
+ changeMetric(scanMetricsHolder.getCountOfBlockReadOps(),
+ CompatScanMetrics.getBlockReadOpsCount(scanMetricsMap));
+ changeMetric(scanMetricsHolder.getRpcScanProcessingTime(),
+ CompatScanMetrics.getRpcScanProcessingTime(scanMetricsMap));
+ changeMetric(scanMetricsHolder.getRpcScanQueueWaitTime(),
+ CompatScanMetrics.getRpcScanQueueWaitTime(scanMetricsMap));
changeMetric(GLOBAL_SCAN_BYTES,
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
changeMetric(GLOBAL_HBASE_COUNT_RPC_CALLS,
scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 8ee8de6971..85cdb2d3bf 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -261,6 +261,18 @@ public enum MetricType {
COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries",
LogLevel.DEBUG, PLong.INSTANCE),
COUNT_ROWS_SCANNED("ws", "Number of rows scanned", LogLevel.DEBUG,
PLong.INSTANCE),
COUNT_ROWS_FILTERED("wf", "Number of rows filtered", LogLevel.DEBUG,
PLong.INSTANCE),
+ FS_READ_TIME("frt", "Time spent in filesystem read", LogLevel.DEBUG,
PLong.INSTANCE),
+ BYTES_READ_FROM_FS("brff", "Number of bytes read from filesystem",
LogLevel.DEBUG,
+ PLong.INSTANCE),
+ BYTES_READ_FROM_MEMSTORE("brfm", "Number of bytes read from memstore",
LogLevel.DEBUG,
+ PLong.INSTANCE),
+ BYTES_READ_FROM_BLOCKCACHE("brfc", "Number of bytes read from blockcache",
LogLevel.DEBUG,
+ PLong.INSTANCE),
+ BLOCK_READ_OPS_COUNT("broc", "Number of block read operations",
LogLevel.DEBUG, PLong.INSTANCE),
+ RPC_SCAN_PROCESSING_TIME("spt", "Time spent in RPC scan processing",
LogLevel.DEBUG,
+ PLong.INSTANCE),
+ RPC_SCAN_QUEUE_WAIT_TIME("sqwt", "Time spent in RPC scan queue wait",
LogLevel.DEBUG,
+ PLong.INSTANCE),
COUNTER_METADATA_INCONSISTENCY("mi", "Number of times the metadata
inconsistencies ",
LogLevel.DEBUG, PLong.INSTANCE),
NUM_SYSTEM_TABLE_RPC_SUCCESS("nstrs", "Number of successful system table RPC
calls",
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index ea82467aa8..0c827602af 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -17,6 +17,10 @@
*/
package org.apache.phoenix.monitoring;
+import static org.apache.phoenix.monitoring.MetricType.BLOCK_READ_OPS_COUNT;
+import static
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_BLOCKCACHE;
+import static org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_FS;
+import static
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_MEMSTORE;
import static
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
import static
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
import static
org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
@@ -28,7 +32,10 @@ import static
org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
+import static org.apache.phoenix.monitoring.MetricType.FS_READ_TIME;
import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER;
+import static
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_PROCESSING_TIME;
+import static
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_QUEUE_WAIT_TIME;
import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
import java.io.IOException;
@@ -52,6 +59,13 @@ public class ScanMetricsHolder {
private final CombinableMetric countOfRowsFiltered;
private final CombinableMetric countOfBytesScanned;
private final CombinableMetric countOfRowsPaged;
+ private final CombinableMetric fsReadTime;
+ private final CombinableMetric countOfBytesReadFromFS;
+ private final CombinableMetric countOfBytesReadFromMemstore;
+ private final CombinableMetric countOfBytesReadFromBlockcache;
+ private final CombinableMetric countOfBlockReadOps;
+ private final CombinableMetric rpcScanProcessingTime;
+ private final CombinableMetric rpcScanQueueWaitTime;
private Map<String, Long> scanMetricMap;
private Object scan;
@@ -83,6 +97,13 @@ public class ScanMetricsHolder {
countOfRowsFiltered = readMetrics.allotMetric(COUNT_ROWS_FILTERED,
tableName);
countOfBytesScanned = readMetrics.allotMetric(SCAN_BYTES, tableName);
countOfRowsPaged = readMetrics.allotMetric(PAGED_ROWS_COUNTER, tableName);
+ fsReadTime = readMetrics.allotMetric(FS_READ_TIME, tableName);
+ countOfBytesReadFromFS = readMetrics.allotMetric(BYTES_READ_FROM_FS,
tableName);
+ countOfBytesReadFromMemstore =
readMetrics.allotMetric(BYTES_READ_FROM_MEMSTORE, tableName);
+ countOfBytesReadFromBlockcache =
readMetrics.allotMetric(BYTES_READ_FROM_BLOCKCACHE, tableName);
+ countOfBlockReadOps = readMetrics.allotMetric(BLOCK_READ_OPS_COUNT,
tableName);
+ rpcScanProcessingTime = readMetrics.allotMetric(RPC_SCAN_PROCESSING_TIME,
tableName);
+ rpcScanQueueWaitTime = readMetrics.allotMetric(RPC_SCAN_QUEUE_WAIT_TIME,
tableName);
}
public CombinableMetric getCountOfRemoteRPCcalls() {
@@ -141,6 +162,34 @@ public class ScanMetricsHolder {
return countOfRowsPaged;
}
+ public CombinableMetric getFsReadTime() {
+ return fsReadTime;
+ }
+
+ public CombinableMetric getCountOfBytesReadFromFS() {
+ return countOfBytesReadFromFS;
+ }
+
+ public CombinableMetric getCountOfBytesReadFromMemstore() {
+ return countOfBytesReadFromMemstore;
+ }
+
+ public CombinableMetric getCountOfBytesReadFromBlockcache() {
+ return countOfBytesReadFromBlockcache;
+ }
+
+ public CombinableMetric getCountOfBlockReadOps() {
+ return countOfBlockReadOps;
+ }
+
+ public CombinableMetric getRpcScanProcessingTime() {
+ return rpcScanProcessingTime;
+ }
+
+ public CombinableMetric getRpcScanQueueWaitTime() {
+ return rpcScanQueueWaitTime;
+ }
+
public void setScanMetricMap(Map<String, Long> scanMetricMap) {
this.scanMetricMap = scanMetricMap;
}
@@ -154,5 +203,4 @@ public class ScanMetricsHolder {
return "{\"Exception while converting scan metrics to Json\":\"" +
e.getMessage() + "\"}";
}
}
-
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DataTableScanMetrics.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DataTableScanMetrics.java
new file mode 100644
index 0000000000..e72216586c
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DataTableScanMetrics.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
+import org.apache.phoenix.compat.hbase.CompatThreadLocalServerSideScanMetrics;
+
+/**
+ * Stores scan metrics from data table operations performed during:
+ * <ul>
+ * <li>Uncovered global index scans</li>
+ * <li>Read repair operations</li>
+ * </ul>
+ * These metrics help identify latency variations that occur when both data
table and index table
+ * are scanned together, and are used to populate {@link
ThreadLocalServerSideScanMetrics} for index
+ * table RPC calls.
+ */
+public class DataTableScanMetrics {
+ private final long fsReadTimeInMs;
+ private final long bytesReadFromFS;
+ private final long bytesReadFromMemstore;
+ private final long bytesReadFromBlockcache;
+ private final long blockReadOps;
+
+ protected DataTableScanMetrics(long fsReadTimeInMs, long bytesReadFromFS,
+ long bytesReadFromMemstore, long bytesReadFromBlockcache, long
blockReadOps) {
+ this.fsReadTimeInMs = fsReadTimeInMs;
+ this.bytesReadFromFS = bytesReadFromFS;
+ this.bytesReadFromMemstore = bytesReadFromMemstore;
+ this.bytesReadFromBlockcache = bytesReadFromBlockcache;
+ this.blockReadOps = blockReadOps;
+ }
+
+ public long getFsReadTimeInMs() {
+ return fsReadTimeInMs;
+ }
+
+ public long getBytesReadFromFS() {
+ return bytesReadFromFS;
+ }
+
+ public long getBytesReadFromMemstore() {
+ return bytesReadFromMemstore;
+ }
+
+ public long getBytesReadFromBlockcache() {
+ return bytesReadFromBlockcache;
+ }
+
+ public long getBlockReadOps() {
+ return blockReadOps;
+ }
+
+ public static class Builder {
+ protected long fsReadTimeInMs = 0;
+ protected long bytesReadFromFS = 0;
+ protected long bytesReadFromMemstore = 0;
+ protected long bytesReadFromBlockcache = 0;
+ protected long blockReadOps = 0;
+
+ public Builder setFsReadTimeInMs(long fsReadTimeInMs) {
+ this.fsReadTimeInMs = fsReadTimeInMs;
+ return this;
+ }
+
+ public Builder setBytesReadFromFS(long bytesReadFromFS) {
+ this.bytesReadFromFS = bytesReadFromFS;
+ return this;
+ }
+
+ public Builder setBytesReadFromMemstore(long bytesReadFromMemstore) {
+ this.bytesReadFromMemstore = bytesReadFromMemstore;
+ return this;
+ }
+
+ public Builder setBytesReadFromBlockcache(long bytesReadFromBlockcache) {
+ this.bytesReadFromBlockcache = bytesReadFromBlockcache;
+ return this;
+ }
+
+ public Builder setBlockReadOps(long blockReadOps) {
+ this.blockReadOps = blockReadOps;
+ return this;
+ }
+
+ public DataTableScanMetrics build() {
+ return new DataTableScanMetrics(fsReadTimeInMs, bytesReadFromFS,
bytesReadFromMemstore,
+ bytesReadFromBlockcache, blockReadOps);
+ }
+ }
+
+ public static void populateDataTableScanMetrics(ScanMetrics scanMetrics,
Builder builder) {
+ builder.setFsReadTimeInMs(CompatScanMetrics.getFsReadTime(scanMetrics))
+ .setBytesReadFromFS(CompatScanMetrics.getBytesReadFromFs(scanMetrics))
+
.setBytesReadFromMemstore(CompatScanMetrics.getBytesReadFromMemstore(scanMetrics))
+
.setBytesReadFromBlockcache(CompatScanMetrics.getBytesReadFromBlockCache(scanMetrics))
+ .setBlockReadOps(CompatScanMetrics.getBlockReadOpsCount(scanMetrics));
+ }
+
+ public void populateThreadLocalServerSideScanMetrics() {
+ CompatThreadLocalServerSideScanMetrics.addFsReadTime(fsReadTimeInMs);
+ CompatThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesReadFromFS);
+
CompatThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(bytesReadFromMemstore);
+
CompatThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(bytesReadFromBlockcache);
+ CompatThreadLocalServerSideScanMetrics.addBlockReadOpsCount(blockReadOps);
+ }
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index e0c9f6298f..0a8a0adcde 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor;
import static
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHYSICAL_DATA_TABLE_NAME;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -31,12 +32,14 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
import org.apache.phoenix.execute.TupleProjector;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.Task;
@@ -71,6 +74,8 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
protected final int rowCountPerTask;
protected String exceptionMessage;
protected final HTableFactory hTableFactory;
+ private final boolean isScanMetricsEnabled;
+ private List<DataTableScanMetricsWithScanTime> dataTableScanMetrics;
// This relies on Hadoop Configuration to handle warning about deprecated
configs and
// to set the correct non-deprecated configs when an old one shows up.
@@ -101,6 +106,11 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
ScanUtil.addEmptyColumnToScan(dataTableScan,
indexMaintainer.getDataEmptyKeyValueCF(),
indexMaintainer.getEmptyKeyValueQualifierForDataTable());
}
+ isScanMetricsEnabled =
+ scan.isScanMetricsEnabled() &&
CompatScanMetrics.supportsFineGrainedReadMetrics();
+ if (isScanMetricsEnabled) {
+ dataTableScanMetrics = new ArrayList<>();
+ }
}
@Override
@@ -117,6 +127,7 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
if (dataScan == null) {
return;
}
+ dataScan.setScanMetricsEnabled(isScanMetricsEnabled);
try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
for (Result result = resultScanner.next(); (result != null); result =
resultScanner.next()) {
if (ScanUtil.isDummy(result)) {
@@ -134,6 +145,13 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
+ region.getRegionInfo().getRegionNameAsString() + " could not
complete on time (in "
+ pageSizeMs + " ms) and" + " will be resubmitted");
}
+ if (isScanMetricsEnabled) {
+ ScanMetrics scanMetrics = resultScanner.getScanMetrics();
+ long scanTimeInMs = EnvironmentEdgeManager.currentTimeMillis() -
startTime;
+ // Capture scan time to identify slowest parallel scan later as that's
the one which slows
+ // down the whole merge operation from data table.
+ dataTableScanMetrics.add(buildDataTableScanMetrics(scanMetrics,
scanTimeInMs));
+ }
} catch (Throwable t) {
exceptionMessage = "scanDataRows fails for at least one task";
ClientUtil.throwIOException(dataHTable.getName().toString(), t);
@@ -208,10 +226,65 @@ public class UncoveredGlobalIndexRegionScanner extends
UncoveredIndexRegionScann
addTasksForScanningDataTableRowsInParallel(tasks, setList.get(i),
startTime);
}
submitTasks(tasks);
+ if (isScanMetricsEnabled) {
+ DataTableScanMetricsWithScanTime dataTableScanMetricsForSlowestScan =
null;
+ for (DataTableScanMetricsWithScanTime dataTableScanMetrics :
dataTableScanMetrics) {
+ if (dataTableScanMetricsForSlowestScan == null) {
+ dataTableScanMetricsForSlowestScan = dataTableScanMetrics;
+ } else if (
+ dataTableScanMetricsForSlowestScan.getScanTimeInMs()
+ < dataTableScanMetrics.getScanTimeInMs()
+ ) {
+ dataTableScanMetricsForSlowestScan = dataTableScanMetrics;
+ }
+ }
+ if (dataTableScanMetricsForSlowestScan != null) {
+
dataTableScanMetricsForSlowestScan.populateThreadLocalServerSideScanMetrics();
+ }
+ }
if (state == State.SCANNING_DATA_INTERRUPTED) {
state = State.SCANNING_DATA;
} else {
state = State.READY;
}
}
+
+ private static DataTableScanMetricsWithScanTime
buildDataTableScanMetrics(ScanMetrics scanMetrics,
+ long scanTimeInMs) {
+ DataTableScanMetricsWithScanTime.Builder builder =
+ new DataTableScanMetricsWithScanTime.Builder();
+ builder.setScanTimeInMs(scanTimeInMs);
+ DataTableScanMetrics.populateDataTableScanMetrics(scanMetrics, builder);
+ return builder.build();
+ }
+
+ private static class DataTableScanMetricsWithScanTime extends
DataTableScanMetrics {
+ private final long scanTimeInMs;
+
+ DataTableScanMetricsWithScanTime(long scanTimeInMs, long fsReadTimeInMs,
long bytesReadFromFS,
+ long bytesReadFromMemstore, long bytesReadFromBlockcache, long
blockReadOps) {
+ super(fsReadTimeInMs, bytesReadFromFS, bytesReadFromMemstore,
bytesReadFromBlockcache,
+ blockReadOps);
+ this.scanTimeInMs = scanTimeInMs;
+ }
+
+ long getScanTimeInMs() {
+ return scanTimeInMs;
+ }
+
+ private static class Builder extends DataTableScanMetrics.Builder {
+ private long scanTimeInMs = 0;
+
+ public Builder setScanTimeInMs(long scanTimeInMs) {
+ this.scanTimeInMs = scanTimeInMs;
+ return this;
+ }
+
+ @Override
+ public DataTableScanMetricsWithScanTime build() {
+ return new DataTableScanMetricsWithScanTime(scanTimeInMs,
fsReadTimeInMs, bytesReadFromFS,
+ bytesReadFromMemstore, bytesReadFromBlockcache, blockReadOps);
+ }
+ }
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 97f23c498f..7b0a2e3f82 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -56,8 +57,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
import org.apache.phoenix.coprocessor.BaseRegionScanner;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.DataTableScanMetrics;
import org.apache.phoenix.coprocessor.DelegateRegionScanner;
import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
import org.apache.phoenix.filter.EmptyColumnOnlyFilter;
@@ -155,6 +158,7 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
private String indexName;
private long pageSizeMs;
private boolean initialized = false;
+ private boolean isScanMetricsEnabled = false;
public GlobalIndexScanner(RegionCoprocessorEnvironment env, Scan scan,
RegionScanner scanner,
GlobalIndexCheckerSource metricsSource) throws IOException {
@@ -186,6 +190,8 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
DEFAULT_REPAIR_LOGGING_PERCENT);
random = new Random(EnvironmentEdgeManager.currentTimeMillis());
pageSizeMs = getPageSizeMsForRegionScanner(scan);
+ isScanMetricsEnabled =
+ scan.isScanMetricsEnabled() &&
CompatScanMetrics.supportsFineGrainedReadMetrics();
}
@Override
@@ -343,6 +349,12 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
return scanner.getMvccReadPoint();
}
+ private DataTableScanMetrics buildDataTableScanMetrics(ScanMetrics
scanMetrics) {
+ DataTableScanMetrics.Builder builder = new
DataTableScanMetrics.Builder();
+ DataTableScanMetrics.populateDataTableScanMetrics(scanMetrics, builder);
+ return builder.build();
+ }
+
private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row)
throws IOException {
if (buildIndexScanForDataTable == null) {
buildIndexScanForDataTable = new Scan();
@@ -400,8 +412,14 @@ public class GlobalIndexChecker extends
BaseScannerRegionObserver implements Reg
buildIndexScanForDataTable.setAttribute(BaseScannerRegionObserverConstants.INDEX_ROW_KEY,
indexRowKey);
Result result = null;
+ buildIndexScanForDataTable.setScanMetricsEnabled(isScanMetricsEnabled);
try (ResultScanner resultScanner =
dataHTable.getScanner(buildIndexScanForDataTable)) {
result = resultScanner.next();
+ if (isScanMetricsEnabled) {
+ ScanMetrics scanMetrics = resultScanner.getScanMetrics();
+ DataTableScanMetrics dataTableScanMetrics =
buildDataTableScanMetrics(scanMetrics);
+ dataTableScanMetrics.populateThreadLocalServerSideScanMetrics();
+ }
} catch (Throwable t) {
ClientUtil.throwIOException(dataHTable.getName().toString(), t);
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/HBaseScanMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/HBaseScanMetricsIT.java
new file mode 100644
index 0000000000..a8f86f7b27
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/HBaseScanMetricsIT.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.CompactSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.ExplainPlanAttributes;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HBaseScanMetricsIT extends BaseTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ Assume.assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(),
"2.6.3") > 0);
+ Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+ props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
+ props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
+ props.put(CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION, "false");
+ setUpTestDriver(new ReadOnlyProps(props));
+ }
+
+ @Test
+ public void testSinglePointLookupQuery() throws Exception {
+ String tableName = generateUniqueName();
+ String sql = "SELECT * FROM " + tableName + " WHERE k1 = 1 AND k2 = 'a'";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTableAndUpsertData(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block + 1 Bloom Block
+ assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 2);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testMultiPointLookupQueryWithoutBloomFilter() throws Exception {
+ String tableName = generateUniqueName();
+ String sql = "SELECT * FROM " + tableName + " WHERE k1 IN (1, 2, 3) AND k2
IN ('a', 'b', 'c')";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTableAndUpsertData(conn, tableName, "BLOOMFILTER='NONE'");
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block
+ assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testMultiPointLookupQueryWithBloomFilter() throws Exception {
+ String tableName = generateUniqueName();
+ String sql = "SELECT * FROM " + tableName + " WHERE k1 IN (1, 2, 3) AND k2
IN ('a', 'b', 'c')";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTableAndUpsertData(conn, tableName,
+ "\"phoenix.bloomfilter.multikey.pointlookup\"=true");
+ Statement stmt = conn.createStatement();
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block + 1 Bloom Block
+ assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 2, true);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testBytesReadInClientUpsertSelect() throws Exception {
+ String sourceTableName = generateUniqueName();
+ String targetTableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTableAndUpsertData(conn, sourceTableName, "");
+ createTable(conn, targetTableName, "");
+ assertOnReadsFromMemstore(sourceTableName,
+ getMutationReadMetrics(conn, targetTableName, sourceTableName, 1));
+ TestUtil.flush(utility, TableName.valueOf(sourceTableName));
+ // 1 Data Block from source table
+ assertOnReadsFromFs(sourceTableName,
+ getMutationReadMetrics(conn, targetTableName, sourceTableName, 2), 1);
+ assertOnReadsFromBlockcache(sourceTableName,
+ getMutationReadMetrics(conn, targetTableName, sourceTableName, 3));
+ }
+ }
+
+ @Test
+ public void testAggregateQueryWithoutGroupBy() throws Exception {
+ String tableName = generateUniqueName();
+ String sql = "SELECT MAX(v1) FROM " + tableName + " WHERE k1 = 1";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'a2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'c', 'c1', 'c2')");
+ conn.commit();
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block from table
+ assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testAggregateQueryWithGroupBy() throws Exception {
+ String tableName = generateUniqueName();
+ String sql = "SELECT MAX(v1) FROM " + tableName + " WHERE v2 = 'v2' GROUP
BY k1";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'c', 'c1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'd', 'd1', 'd2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'e', 'e1', 'v2')");
+ conn.commit();
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block from table
+ assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testAggregateQueryWithGroupByAndOrderBy() throws Exception {
+ String tableName = generateUniqueName();
+ String sql = "SELECT v2, MAX(v1) FROM " + tableName + " GROUP BY v2 ORDER
BY v2";
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'c', 'c1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'd', 'd1', 'd2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'e', 'e1', 'v2')");
+ conn.commit();
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block from table
+ assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testUnionAllQuery() throws Exception {
+ String tableName1 = generateUniqueName();
+ String tableName2 = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName1, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("UPSERT INTO " + tableName1 + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName1 + " (k1, k2, v1, v2) VALUES (1,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName1 + " (k1, k2, v1, v2) VALUES (1,
'c', 'c1', 'v2')");
+ conn.commit();
+ createTable(conn, tableName2, "");
+ stmt.execute("UPSERT INTO " + tableName2 + " (k1, k2, v1, v2) VALUES (3,
'd', 'd1', 'd2')");
+ stmt.execute("UPSERT INTO " + tableName2 + " (k1, k2, v1, v2) VALUES (3,
'e', 'e1', 'v2')");
+ conn.commit();
+ String sql = "SELECT MAX(v1) FROM (SELECT k1, v1 FROM " + tableName1
+ + " UNION ALL SELECT k1, v1 FROM " + tableName2 + ") GROUP BY k1
HAVING MAX(v1) > 'c1'";
+ ResultSet rs = stmt.executeQuery(sql);
+ Map<String, Map<MetricType, Long>> readMetrics = getQueryReadMetrics(rs);
+ assertOnReadsFromMemstore(tableName1, readMetrics);
+ assertOnReadsFromMemstore(tableName2, readMetrics);
+ TestUtil.flush(utility, TableName.valueOf(tableName1));
+ TestUtil.flush(utility, TableName.valueOf(tableName2));
+ rs = stmt.executeQuery(sql);
+ // 1 Data block per table in UNION ALL
+ readMetrics = getQueryReadMetrics(rs);
+ assertOnReadsFromFs(tableName1, readMetrics, 1);
+ assertOnReadsFromFs(tableName2, readMetrics, 1);
+ rs = stmt.executeQuery(sql);
+ readMetrics = getQueryReadMetrics(rs);
+ assertOnReadsFromBlockcache(tableName1, readMetrics);
+ assertOnReadsFromBlockcache(tableName2, readMetrics);
+ }
+ }
+
+ @Test
+ public void testJoinQuery() throws Exception {
+ String tableName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'c', 'c1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'd', 'd1', 'd2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'e', 'e1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'f', 'f1', 'v2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (5,
'g', 'g1', 'v2')");
+ conn.commit();
+ String sql =
+ "SELECT a.k1 as k1, b.k2 as k2, b.v1 as v1, a.total_count as
total_count FROM (SELECT k1, COUNT(*) as total_count FROM "
+ + tableName + " WHERE k1 IN (1, 3) GROUP BY k1) a JOIN (SELECT k1,
k2, v1 FROM "
+ + tableName + " WHERE k1 IN (1, 3) AND k2 = 'a') b ON a.k1 = b.k1";
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block for left table and data block for right table is read
from block cache
+ assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1, true);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testQueryOnUncoveredIndex() throws Exception {
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE UNCOVERED INDEX " + indexName + " ON " + tableName
+ " (v1)");
+ conn.commit();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'a2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'c', 'c1', 'c2')");
+ conn.commit();
+ String sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 =
'b1'";
+ ExplainPlan explainPlan =
+
stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql).getExplainPlan();
+ ExplainPlanAttributes planAttributes =
explainPlan.getPlanStepsAsAttributes();
+ String tableNameFromExplainPlan = planAttributes.getTableName();
+ Assert.assertEquals(indexName, tableNameFromExplainPlan);
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(indexName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ TestUtil.flush(utility, TableName.valueOf(indexName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data block from index table and 1 data block from data table as
index is uncovered
+ assertOnReadsFromFs(indexName, getQueryReadMetrics(rs), 2);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(indexName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testQueryOnCoveredIndexWithoutReadRepair() throws Exception {
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE INDEX " + indexName + " ON " + tableName + " (v1)
INCLUDE (v2)");
+ conn.commit();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'a2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'c', 'c1', 'c2')");
+ conn.commit();
+ String sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 =
'b1'";
+ ExplainPlan explainPlan =
+
stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql).getExplainPlan();
+ ExplainPlanAttributes planAttributes =
explainPlan.getPlanStepsAsAttributes();
+ String tableNameFromExplainPlan = planAttributes.getTableName();
+ Assert.assertEquals(indexName, tableNameFromExplainPlan);
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(indexName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ TestUtil.flush(utility, TableName.valueOf(indexName));
+ rs = stmt.executeQuery(sql);
+ // 1 Data Block from index table
+ assertOnReadsFromFs(indexName, getQueryReadMetrics(rs), 1);
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(indexName, getQueryReadMetrics(rs));
+ }
+ }
+
+ @Test
+ public void testQueryOnCoveredIndexWithReadRepair() throws Exception {
+ String tableName = generateUniqueName();
+ String indexName = generateUniqueName();
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ stmt.execute("CREATE INDEX " + indexName + " ON " + tableName + " (v1)
INCLUDE (v2)");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'a1', 'a2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2,
'b', 'b1', 'b2')");
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'c', 'c1', 'c2')");
+ conn.commit();
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+ String sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 =
'b1'";
+ ResultSet rs = stmt.executeQuery(sql);
+ assertOnReadsFromMemstore(indexName, getQueryReadMetrics(rs));
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ TestUtil.flush(utility, TableName.valueOf(indexName));
+ sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 = 'c1'";
+ rs = stmt.executeQuery(sql);
+ // 1 data block of index table from GlobalIndexScanner, 1 bloom block of
data table while
+ // doing read repair, 2 times same data block of data table while doing
read repair as read
+ // repair opens region scanner thrice and second time its done with
caching to block cache
+ // disabled and third time its done with caching to block cache enabled.
The newly repaired
+ // column qualifier will be in memstore of index table and
+ // GlobalIndexScanner verifies if row got repaired correctly so, read
will even happen from
+ // memstore.
+ assertOnReadsFromFs(indexName, getQueryReadMetrics(rs), 4, true, true);
+ sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 = 'a1'";
+ rs = stmt.executeQuery(sql);
+ assertOnReadsFromBlockcache(indexName, getQueryReadMetrics(rs), true);
+ } finally {
+ IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+ }
+ }
+
+ @Test
+ public void testScanRpcQueueWaitTime() throws Exception {
+ int handlerCount =
+
Integer.parseInt(utility.getConfiguration().get(HConstants.REGION_SERVER_HANDLER_COUNT));
+ int threadPoolSize = 6 * handlerCount;
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(threadPoolSize);
+ String tableName = generateUniqueName();
+ int numRows = 10000;
+ CountDownLatch latch = new CountDownLatch(threadPoolSize);
+ try (Connection conn = DriverManager.getConnection(getUrl())) {
+ createTable(conn, tableName, "");
+ Statement stmt = conn.createStatement();
+ for (int i = 1; i <= numRows; i++) {
+ stmt.execute(
+ "UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (" + i + ",
'a', 'v1', 'v2')");
+ if (i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ TestUtil.flush(utility, TableName.valueOf(tableName));
+ AtomicLong scanRpcQueueWaitTime = new AtomicLong(0);
+ AtomicLong rpcScanProcessingTime = new AtomicLong(0);
+ for (int i = 0; i < threadPoolSize; i++) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Statement stmt = conn.createStatement();
+ stmt.setFetchSize(2);
+ ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+ int rowsRead = 0;
+ while (rs.next()) {
+ rowsRead++;
+ }
+ Assert.assertEquals(numRows, rowsRead);
+ Map<String, Map<MetricType, Long>> readMetrics =
+ PhoenixRuntime.getRequestReadMetricInfo(rs);
+ scanRpcQueueWaitTime
+
.addAndGet(readMetrics.get(tableName).get(MetricType.RPC_SCAN_QUEUE_WAIT_TIME));
+ rpcScanProcessingTime
+
.addAndGet(readMetrics.get(tableName).get(MetricType.RPC_SCAN_PROCESSING_TIME));
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ }
+ latch.await();
+ Assert.assertTrue(scanRpcQueueWaitTime.get() > 0);
+ Assert.assertTrue(rpcScanProcessingTime.get() > 0);
+ } finally {
+ executor.shutdown();
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ }
+ }
+
+ private void createTable(Connection conn, String tableName, String
ddlOptions) throws Exception {
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("CREATE TABLE " + tableName
+ + " (k1 INTEGER NOT NULL, k2 varchar NOT NULL, v1 VARCHAR, v2 VARCHAR,
CONSTRAINT PK PRIMARY KEY (k1, k2)) "
+ + ddlOptions);
+ conn.commit();
+ }
+ }
+
+ private void createTableAndUpsertData(Connection conn, String tableName,
String ddlOptions)
+ throws Exception {
+ createTable(conn, tableName, ddlOptions);
+ try (Statement stmt = conn.createStatement()) {
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1,
'a', 'v1', 'v2')");
+ conn.commit();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2,
'b', 'v1', 'v2')");
+ conn.commit();
+ stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3,
'c', 'v1', 'v2')");
+ conn.commit();
+ }
+ }
+
+ private void assertOnReadsFromMemstore(String tableName,
+ Map<String, Map<MetricType, Long>> readMetrics) throws Exception {
+
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE)
> 0);
+ Assert.assertEquals(0, (long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_FS));
+ Assert.assertEquals(0,
+ (long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE));
+ }
+
+ private void assertOnReadsFromFs(String tableName, Map<String,
Map<MetricType, Long>> readMetrics,
+ long expectedBlocksReadOps) throws Exception {
+ assertOnReadsFromFs(tableName, readMetrics, expectedBlocksReadOps, false);
+ }
+
+ private void assertOnReadsFromFs(String tableName, Map<String,
Map<MetricType, Long>> readMetrics,
+ long expectedBlocksReadOps, boolean isReadFromBlockCacheExpected) throws
Exception {
+ assertOnReadsFromFs(tableName, readMetrics, expectedBlocksReadOps,
isReadFromBlockCacheExpected,
+ false);
+ }
+
+ private void assertOnReadsFromFs(String tableName, Map<String,
Map<MetricType, Long>> readMetrics,
+ long expectedBlocksReadOps, boolean isReadFromBlockCacheExpected,
+ boolean isReadFromMemstoreExpected) throws Exception {
+
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_FS)
> 0);
+ Assert.assertEquals(expectedBlocksReadOps,
+ (long) readMetrics.get(tableName).get(MetricType.BLOCK_READ_OPS_COUNT));
+ if (isReadFromMemstoreExpected) {
+ Assert
+ .assertTrue((long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE) > 0);
+ } else {
+ Assert.assertEquals(0,
+ (long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE));
+ }
+ if (isReadFromBlockCacheExpected) {
+ Assert.assertTrue(
+ (long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE) > 0);
+ } else {
+ Assert.assertEquals(0,
+ (long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE));
+ }
+
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.RPC_SCAN_PROCESSING_TIME)
> 0);
+ Assert.assertTrue(readMetrics.get(tableName).get(MetricType.FS_READ_TIME)
> 0);
+ }
+
+ private void assertOnReadsFromBlockcache(String tableName,
+ Map<String, Map<MetricType, Long>> readMetrics) throws Exception {
+ assertOnReadsFromBlockcache(tableName, readMetrics, false);
+ }
+
+ private void assertOnReadsFromBlockcache(String tableName,
+ Map<String, Map<MetricType, Long>> readMetrics, boolean
isReadFromMemstoreExpected)
+ throws Exception {
+
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE)
> 0);
+ Assert.assertEquals(0, (long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_FS));
+ Assert.assertEquals(0, (long)
readMetrics.get(tableName).get(MetricType.BLOCK_READ_OPS_COUNT));
+ if (isReadFromMemstoreExpected) {
+ Assert
+ .assertTrue((long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE) > 0);
+ } else {
+ Assert.assertEquals(0,
+ (long)
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE));
+ }
+ }
+
+ private Map<String, Map<MetricType, Long>> getQueryReadMetrics(ResultSet rs)
throws Exception {
+ int rowCount = 0;
+ while (rs.next()) {
+ rowCount++;
+ }
+ Assert.assertTrue(rowCount > 0);
+ Map<String, Map<MetricType, Long>> readMetrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ System.out.println("Query readMetrics: " + readMetrics);
+ return readMetrics;
+ }
+
+ private Map<String, Map<MetricType, Long>> getMutationReadMetrics(Connection
conn,
+ String targetTableName, String sourceTableName, int rowId) throws
Exception {
+ Statement stmt = conn.createStatement();
+ stmt.execute("UPSERT INTO " + targetTableName + " (k1, k2, v1, v2) SELECT
* FROM "
+ + sourceTableName + " WHERE k1 = " + rowId);
+ conn.commit();
+ Map<String, Map<MetricType, Long>> readMetrics =
+ PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(conn);
+ System.out.println("Mutation readMetrics: " + readMetrics);
+ PhoenixRuntime.resetMetrics(conn);
+ return readMetrics;
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 0d6e1c5f8e..4a457174f0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -46,8 +46,13 @@ import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
import static
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.BLOCK_READ_OPS_COUNT;
+import static
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_BLOCKCACHE;
+import static org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_FS;
+import static
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_MEMSTORE;
import static
org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.FS_READ_TIME;
import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
@@ -55,6 +60,8 @@ import static
org.apache.phoenix.monitoring.MetricType.QUERY_COMPILER_TIME_MS;
import static org.apache.phoenix.monitoring.MetricType.QUERY_OPTIMIZER_TIME_MS;
import static
org.apache.phoenix.monitoring.MetricType.QUERY_RESULT_ITR_TIME_MS;
import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_PROCESSING_TIME;
+import static
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_QUEUE_WAIT_TIME;
import static
org.apache.phoenix.monitoring.MetricType.SQL_QUERY_PARSING_TIME_MS;
import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
@@ -137,7 +144,9 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME,
DELETE_COMMIT_TIME,
MUTATION_BATCH_COUNTER, SQL_QUERY_PARSING_TIME_MS);
private static final List<MetricType> readMetricsToSkip =
Lists.newArrayList(TASK_QUEUE_WAIT_TIME,
- TASK_EXECUTION_TIME, TASK_END_TO_END_TIME, COUNT_MILLS_BETWEEN_NEXTS);
+ TASK_EXECUTION_TIME, TASK_END_TO_END_TIME, COUNT_MILLS_BETWEEN_NEXTS,
RPC_SCAN_PROCESSING_TIME,
+ RPC_SCAN_QUEUE_WAIT_TIME, FS_READ_TIME, BYTES_READ_FROM_FS,
BYTES_READ_FROM_MEMSTORE,
+ BYTES_READ_FROM_BLOCKCACHE, BLOCK_READ_OPS_COUNT);
private static final String CUSTOM_URL_STRING = "SESSION";
private static final AtomicInteger numConnections = new AtomicInteger(0);
static final String POINT_LOOKUP_SELECT_QUERY = "SELECT J, G, E, (NOW() -
I)*24*60*60*1000 FROM"
diff --git
a/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..93dcc0cf93
--- /dev/null
+++
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+public class CompatScanMetrics {
+ private CompatScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static boolean supportsFineGrainedReadMetrics() {
+ return false;
+ }
+
+ public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getFsReadTime(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics)
{
+ return 0L;
+ }
+
+ public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+}
diff --git
a/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..89151d1fe3
--- /dev/null
+++
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+public class CompatThreadLocalServerSideScanMetrics {
+ private CompatThreadLocalServerSideScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static void addFsReadTime(long fsReadTimeInMs) {
+ }
+
+ public static void addBytesReadFromFs(long bytesReadFromFS) {
+ }
+
+ public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+ }
+
+ public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+ }
+
+ public static void addBlockReadOpsCount(long blockReadOpsCount) {
+ }
+}
diff --git
a/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..93dcc0cf93
--- /dev/null
+++
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+public class CompatScanMetrics {
+ private CompatScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static boolean supportsFineGrainedReadMetrics() {
+ return false;
+ }
+
+ public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getFsReadTime(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics)
{
+ return 0L;
+ }
+
+ public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+}
diff --git
a/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..89151d1fe3
--- /dev/null
+++
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+public class CompatThreadLocalServerSideScanMetrics {
+ private CompatThreadLocalServerSideScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static void addFsReadTime(long fsReadTimeInMs) {
+ }
+
+ public static void addBytesReadFromFs(long bytesReadFromFS) {
+ }
+
+ public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+ }
+
+ public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+ }
+
+ public static void addBlockReadOpsCount(long blockReadOpsCount) {
+ }
+}
diff --git
a/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..93dcc0cf93
--- /dev/null
+++
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+public class CompatScanMetrics {
+ private CompatScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static boolean supportsFineGrainedReadMetrics() {
+ return false;
+ }
+
+ public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getFsReadTime(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics)
{
+ return 0L;
+ }
+
+ public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+
+ public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+ return 0L;
+ }
+}
diff --git
a/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..89151d1fe3
--- /dev/null
+++
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+public class CompatThreadLocalServerSideScanMetrics {
+ private CompatThreadLocalServerSideScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static void addFsReadTime(long fsReadTimeInMs) {
+ }
+
+ public static void addBytesReadFromFs(long bytesReadFromFS) {
+ }
+
+ public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+ }
+
+ public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+ }
+
+ public static void addBlockReadOpsCount(long blockReadOpsCount) {
+ }
+}
diff --git
a/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..4026ceed05
--- /dev/null
+++
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME;
+import static
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
+
+public class CompatScanMetrics {
+
+ private CompatScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static boolean supportsFineGrainedReadMetrics() {
+ return true;
+ }
+
+ public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+ return
scanMetrics.getOrDefault(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME, 0L);
+ }
+
+ public static Long getFsReadTime(ScanMetrics scanMetrics) {
+ return getCounterValue(scanMetrics, FS_READ_TIME_METRIC_NAME);
+ }
+
+ public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+ return
scanMetrics.getOrDefault(ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME,
0L);
+ }
+
+ public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+ return getCounterValue(scanMetrics, BYTES_READ_FROM_FS_METRIC_NAME);
+ }
+
+ public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+ return
scanMetrics.getOrDefault(ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME,
0L);
+ }
+
+ public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+ return getCounterValue(scanMetrics, BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
+ }
+
+ public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics)
{
+ return
scanMetrics.getOrDefault(ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME,
+ 0L);
+ }
+
+ public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+ return getCounterValue(scanMetrics,
BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
+ }
+
+ public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+ return
scanMetrics.getOrDefault(ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME,
0L);
+ }
+
+ public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+ return getCounterValue(scanMetrics, BLOCK_READ_OPS_COUNT_METRIC_NAME);
+ }
+
+ public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+ return
scanMetrics.getOrDefault(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME,
0L);
+ }
+
+ public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+ return
scanMetrics.getOrDefault(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
0L);
+ }
+
+ private static Long getCounterValue(ScanMetrics scanMetrics, String
metricName) {
+ AtomicLong counter = scanMetrics.getCounter(metricName);
+ return counter != null ? counter.get() : 0L;
+ }
+}
diff --git
a/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..ab3b0a11e4
--- /dev/null
+++
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
+
+public class CompatThreadLocalServerSideScanMetrics {
+ private CompatThreadLocalServerSideScanMetrics() {
+ // Not to be instantiated
+ }
+
+ public static void addFsReadTime(long fsReadTimeInMs) {
+ ThreadLocalServerSideScanMetrics.addFsReadTime(fsReadTimeInMs);
+ }
+
+ public static void addBytesReadFromFs(long bytesReadFromFS) {
+ ThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesReadFromFS);
+ }
+
+ public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+
ThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(bytesReadFromMemstore);
+ }
+
+ public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+
ThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(bytesReadFromBlockCache);
+ }
+
+ public static void addBlockReadOpsCount(long blockReadOpsCount) {
+ ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(blockReadOpsCount);
+ }
+}