This is an automated email from the ASF dual-hosted git repository.

sanjeet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 93203b0812 PHOENIX-7704: Integrate HBase's new Scan Latency Metrics 
(#2329)
93203b0812 is described below

commit 93203b081269ca5ed7eeaee85d0d13204d3f7ee4
Author: sanjeet006py <[email protected]>
AuthorDate: Thu Dec 18 09:48:21 2025 +0530

    PHOENIX-7704: Integrate HBase's new Scan Latency Metrics (#2329)
---
 .../phoenix/iterate/ScanningResultIterator.java    |  15 +
 .../org/apache/phoenix/monitoring/MetricType.java  |  12 +
 .../phoenix/monitoring/ScanMetricsHolder.java      |  50 +-
 .../phoenix/coprocessor/DataTableScanMetrics.java  | 124 +++++
 .../UncoveredGlobalIndexRegionScanner.java         |  73 +++
 .../apache/phoenix/index/GlobalIndexChecker.java   |  18 +
 .../phoenix/monitoring/HBaseScanMetricsIT.java     | 542 +++++++++++++++++++++
 .../phoenix/monitoring/PhoenixMetricsIT.java       |  11 +-
 .../phoenix/compat/hbase/CompatScanMetrics.java    |  79 +++
 .../CompatThreadLocalServerSideScanMetrics.java    |  39 ++
 .../phoenix/compat/hbase/CompatScanMetrics.java    |  79 +++
 .../CompatThreadLocalServerSideScanMetrics.java    |  39 ++
 .../phoenix/compat/hbase/CompatScanMetrics.java    |  79 +++
 .../CompatThreadLocalServerSideScanMetrics.java    |  39 ++
 .../phoenix/compat/hbase/CompatScanMetrics.java    |  94 ++++
 .../CompatThreadLocalServerSideScanMetrics.java    |  46 ++
 16 files changed, 1337 insertions(+), 2 deletions(-)

diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
index dbca9f8b96..420d0d570f 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/ScanningResultIterator.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
 import 
org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
 import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.exception.SQLExceptionCode;
@@ -163,6 +164,20 @@ public class ScanningResultIterator implements 
ResultIterator {
       changeMetric(scanMetricsHolder.getCountOfBytesScanned(),
         scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
       changeMetric(scanMetricsHolder.getCountOfRowsPaged(), dummyRowCounter);
+      changeMetric(scanMetricsHolder.getFsReadTime(),
+        CompatScanMetrics.getFsReadTime(scanMetricsMap));
+      changeMetric(scanMetricsHolder.getCountOfBytesReadFromFS(),
+        CompatScanMetrics.getBytesReadFromFs(scanMetricsMap));
+      changeMetric(scanMetricsHolder.getCountOfBytesReadFromMemstore(),
+        CompatScanMetrics.getBytesReadFromMemstore(scanMetricsMap));
+      changeMetric(scanMetricsHolder.getCountOfBytesReadFromBlockcache(),
+        CompatScanMetrics.getBytesReadFromBlockCache(scanMetricsMap));
+      changeMetric(scanMetricsHolder.getCountOfBlockReadOps(),
+        CompatScanMetrics.getBlockReadOpsCount(scanMetricsMap));
+      changeMetric(scanMetricsHolder.getRpcScanProcessingTime(),
+        CompatScanMetrics.getRpcScanProcessingTime(scanMetricsMap));
+      changeMetric(scanMetricsHolder.getRpcScanQueueWaitTime(),
+        CompatScanMetrics.getRpcScanQueueWaitTime(scanMetricsMap));
 
       changeMetric(GLOBAL_SCAN_BYTES, 
scanMetricsMap.get(BYTES_IN_RESULTS_METRIC_NAME));
       changeMetric(GLOBAL_HBASE_COUNT_RPC_CALLS, 
scanMetricsMap.get(RPC_CALLS_METRIC_NAME));
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
index 8ee8de6971..85cdb2d3bf 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/MetricType.java
@@ -261,6 +261,18 @@ public enum MetricType {
   COUNT_REMOTE_RPC_RETRIES("rrr", "Number of remote RPC retries", 
LogLevel.DEBUG, PLong.INSTANCE),
   COUNT_ROWS_SCANNED("ws", "Number of rows scanned", LogLevel.DEBUG, 
PLong.INSTANCE),
   COUNT_ROWS_FILTERED("wf", "Number of rows filtered", LogLevel.DEBUG, 
PLong.INSTANCE),
+  FS_READ_TIME("frt", "Time spent in filesystem read", LogLevel.DEBUG, 
PLong.INSTANCE),
+  BYTES_READ_FROM_FS("brff", "Number of bytes read from filesystem", 
LogLevel.DEBUG,
+    PLong.INSTANCE),
+  BYTES_READ_FROM_MEMSTORE("brfm", "Number of bytes read from memstore", 
LogLevel.DEBUG,
+    PLong.INSTANCE),
+  BYTES_READ_FROM_BLOCKCACHE("brfc", "Number of bytes read from blockcache", 
LogLevel.DEBUG,
+    PLong.INSTANCE),
+  BLOCK_READ_OPS_COUNT("broc", "Number of block read operations", 
LogLevel.DEBUG, PLong.INSTANCE),
+  RPC_SCAN_PROCESSING_TIME("spt", "Time spent in RPC scan processing", 
LogLevel.DEBUG,
+    PLong.INSTANCE),
+  RPC_SCAN_QUEUE_WAIT_TIME("sqwt", "Time spent in RPC scan queue wait", 
LogLevel.DEBUG,
+    PLong.INSTANCE),
   COUNTER_METADATA_INCONSISTENCY("mi", "Number of times the metadata 
inconsistencies ",
     LogLevel.DEBUG, PLong.INSTANCE),
   NUM_SYSTEM_TABLE_RPC_SUCCESS("nstrs", "Number of successful system table RPC 
calls",
diff --git 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
index ea82467aa8..0c827602af 100644
--- 
a/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
+++ 
b/phoenix-core-client/src/main/java/org/apache/phoenix/monitoring/ScanMetricsHolder.java
@@ -17,6 +17,10 @@
  */
 package org.apache.phoenix.monitoring;
 
+import static org.apache.phoenix.monitoring.MetricType.BLOCK_READ_OPS_COUNT;
+import static 
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_BLOCKCACHE;
+import static org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_FS;
+import static 
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_MEMSTORE;
 import static 
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_IN_REMOTE_RESULTS;
 import static 
org.apache.phoenix.monitoring.MetricType.COUNT_BYTES_REGION_SERVER_RESULTS;
 import static 
org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
@@ -28,7 +32,10 @@ import static 
org.apache.phoenix.monitoring.MetricType.COUNT_ROWS_SCANNED;
 import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_CALLS;
 import static org.apache.phoenix.monitoring.MetricType.COUNT_RPC_RETRIES;
 import static org.apache.phoenix.monitoring.MetricType.COUNT_SCANNED_REGIONS;
+import static org.apache.phoenix.monitoring.MetricType.FS_READ_TIME;
 import static org.apache.phoenix.monitoring.MetricType.PAGED_ROWS_COUNTER;
+import static 
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_PROCESSING_TIME;
+import static 
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_QUEUE_WAIT_TIME;
 import static org.apache.phoenix.monitoring.MetricType.SCAN_BYTES;
 
 import java.io.IOException;
@@ -52,6 +59,13 @@ public class ScanMetricsHolder {
   private final CombinableMetric countOfRowsFiltered;
   private final CombinableMetric countOfBytesScanned;
   private final CombinableMetric countOfRowsPaged;
+  private final CombinableMetric fsReadTime;
+  private final CombinableMetric countOfBytesReadFromFS;
+  private final CombinableMetric countOfBytesReadFromMemstore;
+  private final CombinableMetric countOfBytesReadFromBlockcache;
+  private final CombinableMetric countOfBlockReadOps;
+  private final CombinableMetric rpcScanProcessingTime;
+  private final CombinableMetric rpcScanQueueWaitTime;
   private Map<String, Long> scanMetricMap;
   private Object scan;
 
@@ -83,6 +97,13 @@ public class ScanMetricsHolder {
     countOfRowsFiltered = readMetrics.allotMetric(COUNT_ROWS_FILTERED, 
tableName);
     countOfBytesScanned = readMetrics.allotMetric(SCAN_BYTES, tableName);
     countOfRowsPaged = readMetrics.allotMetric(PAGED_ROWS_COUNTER, tableName);
+    fsReadTime = readMetrics.allotMetric(FS_READ_TIME, tableName);
+    countOfBytesReadFromFS = readMetrics.allotMetric(BYTES_READ_FROM_FS, 
tableName);
+    countOfBytesReadFromMemstore = 
readMetrics.allotMetric(BYTES_READ_FROM_MEMSTORE, tableName);
+    countOfBytesReadFromBlockcache = 
readMetrics.allotMetric(BYTES_READ_FROM_BLOCKCACHE, tableName);
+    countOfBlockReadOps = readMetrics.allotMetric(BLOCK_READ_OPS_COUNT, 
tableName);
+    rpcScanProcessingTime = readMetrics.allotMetric(RPC_SCAN_PROCESSING_TIME, 
tableName);
+    rpcScanQueueWaitTime = readMetrics.allotMetric(RPC_SCAN_QUEUE_WAIT_TIME, 
tableName);
   }
 
   public CombinableMetric getCountOfRemoteRPCcalls() {
@@ -141,6 +162,34 @@ public class ScanMetricsHolder {
     return countOfRowsPaged;
   }
 
+  public CombinableMetric getFsReadTime() {
+    return fsReadTime;
+  }
+
+  public CombinableMetric getCountOfBytesReadFromFS() {
+    return countOfBytesReadFromFS;
+  }
+
+  public CombinableMetric getCountOfBytesReadFromMemstore() {
+    return countOfBytesReadFromMemstore;
+  }
+
+  public CombinableMetric getCountOfBytesReadFromBlockcache() {
+    return countOfBytesReadFromBlockcache;
+  }
+
+  public CombinableMetric getCountOfBlockReadOps() {
+    return countOfBlockReadOps;
+  }
+
+  public CombinableMetric getRpcScanProcessingTime() {
+    return rpcScanProcessingTime;
+  }
+
+  public CombinableMetric getRpcScanQueueWaitTime() {
+    return rpcScanQueueWaitTime;
+  }
+
   public void setScanMetricMap(Map<String, Long> scanMetricMap) {
     this.scanMetricMap = scanMetricMap;
   }
@@ -154,5 +203,4 @@ public class ScanMetricsHolder {
       return "{\"Exception while converting scan metrics to Json\":\"" + 
e.getMessage() + "\"}";
     }
   }
-
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DataTableScanMetrics.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DataTableScanMetrics.java
new file mode 100644
index 0000000000..e72216586c
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DataTableScanMetrics.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.coprocessor;
+
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
+import org.apache.phoenix.compat.hbase.CompatThreadLocalServerSideScanMetrics;
+
+/**
+ * Stores scan metrics from data table operations performed during:
+ * <ul>
+ * <li>Uncovered global index scans</li>
+ * <li>Read repair operations</li>
+ * </ul>
+ * These metrics help identify latency variations that occur when both data 
table and index table
+ * are scanned together, and are used to populate {@link 
ThreadLocalServerSideScanMetrics} for index
+ * table RPC calls.
+ */
+public class DataTableScanMetrics {
+  private final long fsReadTimeInMs;
+  private final long bytesReadFromFS;
+  private final long bytesReadFromMemstore;
+  private final long bytesReadFromBlockcache;
+  private final long blockReadOps;
+
+  protected DataTableScanMetrics(long fsReadTimeInMs, long bytesReadFromFS,
+    long bytesReadFromMemstore, long bytesReadFromBlockcache, long 
blockReadOps) {
+    this.fsReadTimeInMs = fsReadTimeInMs;
+    this.bytesReadFromFS = bytesReadFromFS;
+    this.bytesReadFromMemstore = bytesReadFromMemstore;
+    this.bytesReadFromBlockcache = bytesReadFromBlockcache;
+    this.blockReadOps = blockReadOps;
+  }
+
+  public long getFsReadTimeInMs() {
+    return fsReadTimeInMs;
+  }
+
+  public long getBytesReadFromFS() {
+    return bytesReadFromFS;
+  }
+
+  public long getBytesReadFromMemstore() {
+    return bytesReadFromMemstore;
+  }
+
+  public long getBytesReadFromBlockcache() {
+    return bytesReadFromBlockcache;
+  }
+
+  public long getBlockReadOps() {
+    return blockReadOps;
+  }
+
+  public static class Builder {
+    protected long fsReadTimeInMs = 0;
+    protected long bytesReadFromFS = 0;
+    protected long bytesReadFromMemstore = 0;
+    protected long bytesReadFromBlockcache = 0;
+    protected long blockReadOps = 0;
+
+    public Builder setFsReadTimeInMs(long fsReadTimeInMs) {
+      this.fsReadTimeInMs = fsReadTimeInMs;
+      return this;
+    }
+
+    public Builder setBytesReadFromFS(long bytesReadFromFS) {
+      this.bytesReadFromFS = bytesReadFromFS;
+      return this;
+    }
+
+    public Builder setBytesReadFromMemstore(long bytesReadFromMemstore) {
+      this.bytesReadFromMemstore = bytesReadFromMemstore;
+      return this;
+    }
+
+    public Builder setBytesReadFromBlockcache(long bytesReadFromBlockcache) {
+      this.bytesReadFromBlockcache = bytesReadFromBlockcache;
+      return this;
+    }
+
+    public Builder setBlockReadOps(long blockReadOps) {
+      this.blockReadOps = blockReadOps;
+      return this;
+    }
+
+    public DataTableScanMetrics build() {
+      return new DataTableScanMetrics(fsReadTimeInMs, bytesReadFromFS, 
bytesReadFromMemstore,
+        bytesReadFromBlockcache, blockReadOps);
+    }
+  }
+
+  public static void populateDataTableScanMetrics(ScanMetrics scanMetrics, 
Builder builder) {
+    builder.setFsReadTimeInMs(CompatScanMetrics.getFsReadTime(scanMetrics))
+      .setBytesReadFromFS(CompatScanMetrics.getBytesReadFromFs(scanMetrics))
+      
.setBytesReadFromMemstore(CompatScanMetrics.getBytesReadFromMemstore(scanMetrics))
+      
.setBytesReadFromBlockcache(CompatScanMetrics.getBytesReadFromBlockCache(scanMetrics))
+      .setBlockReadOps(CompatScanMetrics.getBlockReadOpsCount(scanMetrics));
+  }
+
+  public void populateThreadLocalServerSideScanMetrics() {
+    CompatThreadLocalServerSideScanMetrics.addFsReadTime(fsReadTimeInMs);
+    CompatThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesReadFromFS);
+    
CompatThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(bytesReadFromMemstore);
+    
CompatThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(bytesReadFromBlockcache);
+    CompatThreadLocalServerSideScanMetrics.addBlockReadOpsCount(blockReadOps);
+  }
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
index e0c9f6298f..0a8a0adcde 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UncoveredGlobalIndexRegionScanner.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.coprocessor;
 import static 
org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants.PHYSICAL_DATA_TABLE_NAME;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
@@ -31,12 +32,14 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
 import org.apache.phoenix.execute.TupleProjector;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
 import org.apache.phoenix.hbase.index.parallel.Task;
@@ -71,6 +74,8 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
   protected final int rowCountPerTask;
   protected String exceptionMessage;
   protected final HTableFactory hTableFactory;
+  private final boolean isScanMetricsEnabled;
+  private List<DataTableScanMetricsWithScanTime> dataTableScanMetrics;
 
   // This relies on Hadoop Configuration to handle warning about deprecated 
configs and
   // to set the correct non-deprecated configs when an old one shows up.
@@ -101,6 +106,11 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
       ScanUtil.addEmptyColumnToScan(dataTableScan, 
indexMaintainer.getDataEmptyKeyValueCF(),
         indexMaintainer.getEmptyKeyValueQualifierForDataTable());
     }
+    isScanMetricsEnabled =
+      scan.isScanMetricsEnabled() && 
CompatScanMetrics.supportsFineGrainedReadMetrics();
+    if (isScanMetricsEnabled) {
+      dataTableScanMetrics = new ArrayList<>();
+    }
   }
 
   @Override
@@ -117,6 +127,7 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
     if (dataScan == null) {
       return;
     }
+    dataScan.setScanMetricsEnabled(isScanMetricsEnabled);
     try (ResultScanner resultScanner = dataHTable.getScanner(dataScan)) {
       for (Result result = resultScanner.next(); (result != null); result = 
resultScanner.next()) {
         if (ScanUtil.isDummy(result)) {
@@ -134,6 +145,13 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
           + region.getRegionInfo().getRegionNameAsString() + " could not 
complete on time (in "
           + pageSizeMs + " ms) and" + " will be resubmitted");
       }
+      if (isScanMetricsEnabled) {
+        ScanMetrics scanMetrics = resultScanner.getScanMetrics();
+        long scanTimeInMs = EnvironmentEdgeManager.currentTimeMillis() - 
startTime;
+        // Capture scan time to identify slowest parallel scan later as that's 
the one which slows
+        // down the whole merge operation from data table.
+        dataTableScanMetrics.add(buildDataTableScanMetrics(scanMetrics, 
scanTimeInMs));
+      }
     } catch (Throwable t) {
       exceptionMessage = "scanDataRows fails for at least one task";
       ClientUtil.throwIOException(dataHTable.getName().toString(), t);
@@ -208,10 +226,65 @@ public class UncoveredGlobalIndexRegionScanner extends 
UncoveredIndexRegionScann
       addTasksForScanningDataTableRowsInParallel(tasks, setList.get(i), 
startTime);
     }
     submitTasks(tasks);
+    if (isScanMetricsEnabled) {
+      DataTableScanMetricsWithScanTime dataTableScanMetricsForSlowestScan = 
null;
+      for (DataTableScanMetricsWithScanTime dataTableScanMetrics : 
dataTableScanMetrics) {
+        if (dataTableScanMetricsForSlowestScan == null) {
+          dataTableScanMetricsForSlowestScan = dataTableScanMetrics;
+        } else if (
+          dataTableScanMetricsForSlowestScan.getScanTimeInMs()
+              < dataTableScanMetrics.getScanTimeInMs()
+        ) {
+          dataTableScanMetricsForSlowestScan = dataTableScanMetrics;
+        }
+      }
+      if (dataTableScanMetricsForSlowestScan != null) {
+        
dataTableScanMetricsForSlowestScan.populateThreadLocalServerSideScanMetrics();
+      }
+    }
     if (state == State.SCANNING_DATA_INTERRUPTED) {
       state = State.SCANNING_DATA;
     } else {
       state = State.READY;
     }
   }
+
+  private static DataTableScanMetricsWithScanTime 
buildDataTableScanMetrics(ScanMetrics scanMetrics,
+    long scanTimeInMs) {
+    DataTableScanMetricsWithScanTime.Builder builder =
+      new DataTableScanMetricsWithScanTime.Builder();
+    builder.setScanTimeInMs(scanTimeInMs);
+    DataTableScanMetrics.populateDataTableScanMetrics(scanMetrics, builder);
+    return builder.build();
+  }
+
+  private static class DataTableScanMetricsWithScanTime extends 
DataTableScanMetrics {
+    private final long scanTimeInMs;
+
+    DataTableScanMetricsWithScanTime(long scanTimeInMs, long fsReadTimeInMs, 
long bytesReadFromFS,
+      long bytesReadFromMemstore, long bytesReadFromBlockcache, long 
blockReadOps) {
+      super(fsReadTimeInMs, bytesReadFromFS, bytesReadFromMemstore, 
bytesReadFromBlockcache,
+        blockReadOps);
+      this.scanTimeInMs = scanTimeInMs;
+    }
+
+    long getScanTimeInMs() {
+      return scanTimeInMs;
+    }
+
+    private static class Builder extends DataTableScanMetrics.Builder {
+      private long scanTimeInMs = 0;
+
+      public Builder setScanTimeInMs(long scanTimeInMs) {
+        this.scanTimeInMs = scanTimeInMs;
+        return this;
+      }
+
+      @Override
+      public DataTableScanMetricsWithScanTime build() {
+        return new DataTableScanMetricsWithScanTime(scanTimeInMs, 
fsReadTimeInMs, bytesReadFromFS,
+          bytesReadFromMemstore, bytesReadFromBlockcache, blockReadOps);
+      }
+    }
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index 97f23c498f..7b0a2e3f82 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -56,8 +57,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.compat.hbase.CompatScanMetrics;
 import org.apache.phoenix.coprocessor.BaseRegionScanner;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
+import org.apache.phoenix.coprocessor.DataTableScanMetrics;
 import org.apache.phoenix.coprocessor.DelegateRegionScanner;
 import org.apache.phoenix.coprocessorclient.BaseScannerRegionObserverConstants;
 import org.apache.phoenix.filter.EmptyColumnOnlyFilter;
@@ -155,6 +158,7 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
     private String indexName;
     private long pageSizeMs;
     private boolean initialized = false;
+    private boolean isScanMetricsEnabled = false;
 
     public GlobalIndexScanner(RegionCoprocessorEnvironment env, Scan scan, 
RegionScanner scanner,
       GlobalIndexCheckerSource metricsSource) throws IOException {
@@ -186,6 +190,8 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
         DEFAULT_REPAIR_LOGGING_PERCENT);
       random = new Random(EnvironmentEdgeManager.currentTimeMillis());
       pageSizeMs = getPageSizeMsForRegionScanner(scan);
+      isScanMetricsEnabled =
+        scan.isScanMetricsEnabled() && 
CompatScanMetrics.supportsFineGrainedReadMetrics();
     }
 
     @Override
@@ -343,6 +349,12 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
       return scanner.getMvccReadPoint();
     }
 
+    private DataTableScanMetrics buildDataTableScanMetrics(ScanMetrics 
scanMetrics) {
+      DataTableScanMetrics.Builder builder = new 
DataTableScanMetrics.Builder();
+      DataTableScanMetrics.populateDataTableScanMetrics(scanMetrics, builder);
+      return builder.build();
+    }
+
     private void repairIndexRows(byte[] indexRowKey, long ts, List<Cell> row) 
throws IOException {
       if (buildIndexScanForDataTable == null) {
         buildIndexScanForDataTable = new Scan();
@@ -400,8 +412,14 @@ public class GlobalIndexChecker extends 
BaseScannerRegionObserver implements Reg
       
buildIndexScanForDataTable.setAttribute(BaseScannerRegionObserverConstants.INDEX_ROW_KEY,
         indexRowKey);
       Result result = null;
+      buildIndexScanForDataTable.setScanMetricsEnabled(isScanMetricsEnabled);
       try (ResultScanner resultScanner = 
dataHTable.getScanner(buildIndexScanForDataTable)) {
         result = resultScanner.next();
+        if (isScanMetricsEnabled) {
+          ScanMetrics scanMetrics = resultScanner.getScanMetrics();
+          DataTableScanMetrics dataTableScanMetrics = 
buildDataTableScanMetrics(scanMetrics);
+          dataTableScanMetrics.populateThreadLocalServerSideScanMetrics();
+        }
       } catch (Throwable t) {
         ClientUtil.throwIOException(dataHTable.getName().toString(), t);
       }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/HBaseScanMetricsIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/HBaseScanMetricsIT.java
new file mode 100644
index 0000000000..a8f86f7b27
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/HBaseScanMetricsIT.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.monitoring;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.CompactSplit;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.phoenix.compile.ExplainPlan;
+import org.apache.phoenix.compile.ExplainPlanAttributes;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.hbase.index.IndexRegionObserver;
+import org.apache.phoenix.jdbc.PhoenixStatement;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.util.PhoenixRuntime;
+import org.apache.phoenix.util.ReadOnlyProps;
+import org.apache.phoenix.util.TestUtil;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
+
+@Category(NeedsOwnMiniClusterTest.class)
+public class HBaseScanMetricsIT extends BaseTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    Assume.assumeTrue(VersionInfo.compareVersion(VersionInfo.getVersion(), 
"2.6.3") > 0);
+    Map<String, String> props = Maps.newHashMapWithExpectedSize(4);
+    props.put(QueryServices.COLLECT_REQUEST_LEVEL_METRICS, "true");
+    props.put(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, "0");
+    props.put(CompactSplit.HBASE_REGION_SERVER_ENABLE_COMPACTION, "false");
+    setUpTestDriver(new ReadOnlyProps(props));
+  }
+
+  @Test
+  public void testSinglePointLookupQuery() throws Exception {
+    String tableName = generateUniqueName();
+    String sql = "SELECT * FROM " + tableName + " WHERE k1 = 1 AND k2 = 'a'";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTableAndUpsertData(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block + 1 Bloom Block
+      assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 2);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testMultiPointLookupQueryWithoutBloomFilter() throws Exception {
+    String tableName = generateUniqueName();
+    String sql = "SELECT * FROM " + tableName + " WHERE k1 IN (1, 2, 3) AND k2 
IN ('a', 'b', 'c')";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTableAndUpsertData(conn, tableName, "BLOOMFILTER='NONE'");
+      Statement stmt = conn.createStatement();
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block
+      assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testMultiPointLookupQueryWithBloomFilter() throws Exception {
+    String tableName = generateUniqueName();
+    String sql = "SELECT * FROM " + tableName + " WHERE k1 IN (1, 2, 3) AND k2 
IN ('a', 'b', 'c')";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTableAndUpsertData(conn, tableName,
+        "\"phoenix.bloomfilter.multikey.pointlookup\"=true");
+      Statement stmt = conn.createStatement();
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block + 1 Bloom Block
+      assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 2, true);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testBytesReadInClientUpsertSelect() throws Exception {
+    String sourceTableName = generateUniqueName();
+    String targetTableName = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTableAndUpsertData(conn, sourceTableName, "");
+      createTable(conn, targetTableName, "");
+      assertOnReadsFromMemstore(sourceTableName,
+        getMutationReadMetrics(conn, targetTableName, sourceTableName, 1));
+      TestUtil.flush(utility, TableName.valueOf(sourceTableName));
+      // 1 Data Block from source table
+      assertOnReadsFromFs(sourceTableName,
+        getMutationReadMetrics(conn, targetTableName, sourceTableName, 2), 1);
+      assertOnReadsFromBlockcache(sourceTableName,
+        getMutationReadMetrics(conn, targetTableName, sourceTableName, 3));
+    }
+  }
+
+  @Test
+  public void testAggregateQueryWithoutGroupBy() throws Exception {
+    String tableName = generateUniqueName();
+    String sql = "SELECT MAX(v1) FROM " + tableName + " WHERE k1 = 1";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'a2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'c', 'c1', 'c2')");
+      conn.commit();
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block from table
+      assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testAggregateQueryWithGroupBy() throws Exception {
+    String tableName = generateUniqueName();
+    String sql = "SELECT MAX(v1) FROM " + tableName + " WHERE v2 = 'v2' GROUP 
BY k1";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'c', 'c1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'd', 'd1', 'd2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'e', 'e1', 'v2')");
+      conn.commit();
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block from table
+      assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testAggregateQueryWithGroupByAndOrderBy() throws Exception {
+    String tableName = generateUniqueName();
+    String sql = "SELECT v2, MAX(v1) FROM " + tableName + " GROUP BY v2 ORDER 
BY v2";
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'c', 'c1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'd', 'd1', 'd2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'e', 'e1', 'v2')");
+      conn.commit();
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block from table
+      assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testUnionAllQuery() throws Exception {
+    String tableName1 = generateUniqueName();
+    String tableName2 = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName1, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("UPSERT INTO " + tableName1 + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName1 + " (k1, k2, v1, v2) VALUES (1, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName1 + " (k1, k2, v1, v2) VALUES (1, 
'c', 'c1', 'v2')");
+      conn.commit();
+      createTable(conn, tableName2, "");
+      stmt.execute("UPSERT INTO " + tableName2 + " (k1, k2, v1, v2) VALUES (3, 
'd', 'd1', 'd2')");
+      stmt.execute("UPSERT INTO " + tableName2 + " (k1, k2, v1, v2) VALUES (3, 
'e', 'e1', 'v2')");
+      conn.commit();
+      String sql = "SELECT MAX(v1) FROM (SELECT k1, v1 FROM " + tableName1
+        + " UNION ALL SELECT k1, v1 FROM " + tableName2 + ") GROUP BY k1 
HAVING MAX(v1) > 'c1'";
+      ResultSet rs = stmt.executeQuery(sql);
+      Map<String, Map<MetricType, Long>> readMetrics = getQueryReadMetrics(rs);
+      assertOnReadsFromMemstore(tableName1, readMetrics);
+      assertOnReadsFromMemstore(tableName2, readMetrics);
+      TestUtil.flush(utility, TableName.valueOf(tableName1));
+      TestUtil.flush(utility, TableName.valueOf(tableName2));
+      rs = stmt.executeQuery(sql);
+      // 1 Data block per table in UNION ALL
+      readMetrics = getQueryReadMetrics(rs);
+      assertOnReadsFromFs(tableName1, readMetrics, 1);
+      assertOnReadsFromFs(tableName2, readMetrics, 1);
+      rs = stmt.executeQuery(sql);
+      readMetrics = getQueryReadMetrics(rs);
+      assertOnReadsFromBlockcache(tableName1, readMetrics);
+      assertOnReadsFromBlockcache(tableName2, readMetrics);
+    }
+  }
+
+  @Test
+  public void testJoinQuery() throws Exception {
+    String tableName = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'c', 'c1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'd', 'd1', 'd2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'e', 'e1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'f', 'f1', 'v2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (5, 
'g', 'g1', 'v2')");
+      conn.commit();
+      String sql =
+        "SELECT a.k1 as k1, b.k2 as k2, b.v1 as v1, a.total_count as 
total_count FROM (SELECT k1, COUNT(*) as total_count FROM "
+          + tableName + " WHERE k1 IN (1, 3) GROUP BY k1) a JOIN (SELECT k1, 
k2, v1 FROM "
+          + tableName + " WHERE k1 IN (1, 3) AND k2 = 'a') b ON a.k1 = b.k1";
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(tableName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block for left table and data block for right table is read 
from block cache
+      assertOnReadsFromFs(tableName, getQueryReadMetrics(rs), 1, true);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(tableName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testQueryOnUncoveredIndex() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("CREATE UNCOVERED INDEX " + indexName + " ON " + tableName 
+ " (v1)");
+      conn.commit();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'a2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'c', 'c1', 'c2')");
+      conn.commit();
+      String sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 = 
'b1'";
+      ExplainPlan explainPlan =
+        
stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql).getExplainPlan();
+      ExplainPlanAttributes planAttributes = 
explainPlan.getPlanStepsAsAttributes();
+      String tableNameFromExplainPlan = planAttributes.getTableName();
+      Assert.assertEquals(indexName, tableNameFromExplainPlan);
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(indexName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      TestUtil.flush(utility, TableName.valueOf(indexName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data block from index table and 1 data block from data table as 
index is uncovered
+      assertOnReadsFromFs(indexName, getQueryReadMetrics(rs), 2);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(indexName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testQueryOnCoveredIndexWithoutReadRepair() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("CREATE INDEX " + indexName + " ON " + tableName + " (v1) 
INCLUDE (v2)");
+      conn.commit();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'a2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'c', 'c1', 'c2')");
+      conn.commit();
+      String sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 = 
'b1'";
+      ExplainPlan explainPlan =
+        
stmt.unwrap(PhoenixStatement.class).optimizeQuery(sql).getExplainPlan();
+      ExplainPlanAttributes planAttributes = 
explainPlan.getPlanStepsAsAttributes();
+      String tableNameFromExplainPlan = planAttributes.getTableName();
+      Assert.assertEquals(indexName, tableNameFromExplainPlan);
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(indexName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      TestUtil.flush(utility, TableName.valueOf(indexName));
+      rs = stmt.executeQuery(sql);
+      // 1 Data Block from index table
+      assertOnReadsFromFs(indexName, getQueryReadMetrics(rs), 1);
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(indexName, getQueryReadMetrics(rs));
+    }
+  }
+
+  @Test
+  public void testQueryOnCoveredIndexWithReadRepair() throws Exception {
+    String tableName = generateUniqueName();
+    String indexName = generateUniqueName();
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      stmt.execute("CREATE INDEX " + indexName + " ON " + tableName + " (v1) 
INCLUDE (v2)");
+      conn.commit();
+      IndexRegionObserver.setFailPostIndexUpdatesForTesting(true);
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'a1', 'a2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2, 
'b', 'b1', 'b2')");
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'c', 'c1', 'c2')");
+      conn.commit();
+      IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+      TestUtil.dumpTable(conn, TableName.valueOf(indexName));
+      String sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 = 
'b1'";
+      ResultSet rs = stmt.executeQuery(sql);
+      assertOnReadsFromMemstore(indexName, getQueryReadMetrics(rs));
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      TestUtil.flush(utility, TableName.valueOf(indexName));
+      sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 = 'c1'";
+      rs = stmt.executeQuery(sql);
+      // 1 data block of index table from GlobalIndexScanner, 1 bloom block of 
data table while
+      // doing read repair, 2 times same data block of data table while doing 
read repair as read
+      // repair opens region scanner thrice and second time its done with 
caching to block cache
+      // disabled and third time its done with caching to block cache enabled. 
The newly repaired
+      // column qualifier will be in memstore of index table and
+      // GlobalIndexScanner verifies if row got repaired correctly so, read 
will even happen from
+      // memstore.
+      assertOnReadsFromFs(indexName, getQueryReadMetrics(rs), 4, true, true);
+      sql = "SELECT k1, k2, v1, v2 FROM " + tableName + " WHERE v1 = 'a1'";
+      rs = stmt.executeQuery(sql);
+      assertOnReadsFromBlockcache(indexName, getQueryReadMetrics(rs), true);
+    } finally {
+      IndexRegionObserver.setFailPostIndexUpdatesForTesting(false);
+    }
+  }
+
+  @Test
+  public void testScanRpcQueueWaitTime() throws Exception {
+    int handlerCount =
+      
Integer.parseInt(utility.getConfiguration().get(HConstants.REGION_SERVER_HANDLER_COUNT));
+    int threadPoolSize = 6 * handlerCount;
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(threadPoolSize);
+    String tableName = generateUniqueName();
+    int numRows = 10000;
+    CountDownLatch latch = new CountDownLatch(threadPoolSize);
+    try (Connection conn = DriverManager.getConnection(getUrl())) {
+      createTable(conn, tableName, "");
+      Statement stmt = conn.createStatement();
+      for (int i = 1; i <= numRows; i++) {
+        stmt.execute(
+          "UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (" + i + ", 
'a', 'v1', 'v2')");
+        if (i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      TestUtil.flush(utility, TableName.valueOf(tableName));
+      AtomicLong scanRpcQueueWaitTime = new AtomicLong(0);
+      AtomicLong rpcScanProcessingTime = new AtomicLong(0);
+      for (int i = 0; i < threadPoolSize; i++) {
+        executor.execute(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              Statement stmt = conn.createStatement();
+              stmt.setFetchSize(2);
+              ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName);
+              int rowsRead = 0;
+              while (rs.next()) {
+                rowsRead++;
+              }
+              Assert.assertEquals(numRows, rowsRead);
+              Map<String, Map<MetricType, Long>> readMetrics =
+                PhoenixRuntime.getRequestReadMetricInfo(rs);
+              scanRpcQueueWaitTime
+                
.addAndGet(readMetrics.get(tableName).get(MetricType.RPC_SCAN_QUEUE_WAIT_TIME));
+              rpcScanProcessingTime
+                
.addAndGet(readMetrics.get(tableName).get(MetricType.RPC_SCAN_PROCESSING_TIME));
+            } catch (SQLException e) {
+              throw new RuntimeException(e);
+            } finally {
+              latch.countDown();
+            }
+          }
+        });
+      }
+      latch.await();
+      Assert.assertTrue(scanRpcQueueWaitTime.get() > 0);
+      Assert.assertTrue(rpcScanProcessingTime.get() > 0);
+    } finally {
+      executor.shutdown();
+      executor.awaitTermination(10, TimeUnit.SECONDS);
+    }
+  }
+
+  private void createTable(Connection conn, String tableName, String 
ddlOptions) throws Exception {
+    try (Statement stmt = conn.createStatement()) {
+      stmt.execute("CREATE TABLE " + tableName
+        + " (k1 INTEGER NOT NULL, k2 varchar NOT NULL, v1 VARCHAR, v2 VARCHAR, 
CONSTRAINT PK PRIMARY KEY (k1, k2)) "
+        + ddlOptions);
+      conn.commit();
+    }
+  }
+
+  private void createTableAndUpsertData(Connection conn, String tableName, 
String ddlOptions)
+    throws Exception {
+    createTable(conn, tableName, ddlOptions);
+    try (Statement stmt = conn.createStatement()) {
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (1, 
'a', 'v1', 'v2')");
+      conn.commit();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (2, 
'b', 'v1', 'v2')");
+      conn.commit();
+      stmt.execute("UPSERT INTO " + tableName + " (k1, k2, v1, v2) VALUES (3, 
'c', 'v1', 'v2')");
+      conn.commit();
+    }
+  }
+
+  private void assertOnReadsFromMemstore(String tableName,
+    Map<String, Map<MetricType, Long>> readMetrics) throws Exception {
+    
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE)
 > 0);
+    Assert.assertEquals(0, (long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_FS));
+    Assert.assertEquals(0,
+      (long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE));
+  }
+
+  private void assertOnReadsFromFs(String tableName, Map<String, 
Map<MetricType, Long>> readMetrics,
+    long expectedBlocksReadOps) throws Exception {
+    assertOnReadsFromFs(tableName, readMetrics, expectedBlocksReadOps, false);
+  }
+
+  private void assertOnReadsFromFs(String tableName, Map<String, 
Map<MetricType, Long>> readMetrics,
+    long expectedBlocksReadOps, boolean isReadFromBlockCacheExpected) throws 
Exception {
+    assertOnReadsFromFs(tableName, readMetrics, expectedBlocksReadOps, 
isReadFromBlockCacheExpected,
+      false);
+  }
+
+  private void assertOnReadsFromFs(String tableName, Map<String, 
Map<MetricType, Long>> readMetrics,
+    long expectedBlocksReadOps, boolean isReadFromBlockCacheExpected,
+    boolean isReadFromMemstoreExpected) throws Exception {
+    
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_FS) 
> 0);
+    Assert.assertEquals(expectedBlocksReadOps,
+      (long) readMetrics.get(tableName).get(MetricType.BLOCK_READ_OPS_COUNT));
+    if (isReadFromMemstoreExpected) {
+      Assert
+        .assertTrue((long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE) > 0);
+    } else {
+      Assert.assertEquals(0,
+        (long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE));
+    }
+    if (isReadFromBlockCacheExpected) {
+      Assert.assertTrue(
+        (long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE) > 0);
+    } else {
+      Assert.assertEquals(0,
+        (long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE));
+    }
+    
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.RPC_SCAN_PROCESSING_TIME)
 > 0);
+    Assert.assertTrue(readMetrics.get(tableName).get(MetricType.FS_READ_TIME) 
> 0);
+  }
+
+  private void assertOnReadsFromBlockcache(String tableName,
+    Map<String, Map<MetricType, Long>> readMetrics) throws Exception {
+    assertOnReadsFromBlockcache(tableName, readMetrics, false);
+  }
+
+  private void assertOnReadsFromBlockcache(String tableName,
+    Map<String, Map<MetricType, Long>> readMetrics, boolean 
isReadFromMemstoreExpected)
+    throws Exception {
+    
Assert.assertTrue(readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_BLOCKCACHE)
 > 0);
+    Assert.assertEquals(0, (long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_FS));
+    Assert.assertEquals(0, (long) 
readMetrics.get(tableName).get(MetricType.BLOCK_READ_OPS_COUNT));
+    if (isReadFromMemstoreExpected) {
+      Assert
+        .assertTrue((long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE) > 0);
+    } else {
+      Assert.assertEquals(0,
+        (long) 
readMetrics.get(tableName).get(MetricType.BYTES_READ_FROM_MEMSTORE));
+    }
+  }
+
+  private Map<String, Map<MetricType, Long>> getQueryReadMetrics(ResultSet rs) 
throws Exception {
+    int rowCount = 0;
+    while (rs.next()) {
+      rowCount++;
+    }
+    Assert.assertTrue(rowCount > 0);
+    Map<String, Map<MetricType, Long>> readMetrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+    System.out.println("Query readMetrics: " + readMetrics);
+    return readMetrics;
+  }
+
+  private Map<String, Map<MetricType, Long>> getMutationReadMetrics(Connection 
conn,
+    String targetTableName, String sourceTableName, int rowId) throws 
Exception {
+    Statement stmt = conn.createStatement();
+    stmt.execute("UPSERT INTO " + targetTableName + " (k1, k2, v1, v2) SELECT 
* FROM "
+      + sourceTableName + " WHERE k1 = " + rowId);
+    conn.commit();
+    Map<String, Map<MetricType, Long>> readMetrics =
+      PhoenixRuntime.getReadMetricInfoForMutationsSinceLastReset(conn);
+    System.out.println("Mutation readMetrics: " + readMetrics);
+    PhoenixRuntime.resetMetrics(conn);
+    return readMetrics;
+  }
+}
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
index 0d6e1c5f8e..4a457174f0 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/monitoring/PhoenixMetricsIT.java
@@ -46,8 +46,13 @@ import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQ
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SPOOL_FILE_COUNTER;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_END_TO_END_TIME;
 import static 
org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_TASK_EXECUTION_TIME;
+import static org.apache.phoenix.monitoring.MetricType.BLOCK_READ_OPS_COUNT;
+import static 
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_BLOCKCACHE;
+import static org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_FS;
+import static 
org.apache.phoenix.monitoring.MetricType.BYTES_READ_FROM_MEMSTORE;
 import static 
org.apache.phoenix.monitoring.MetricType.COUNT_MILLS_BETWEEN_NEXTS;
 import static org.apache.phoenix.monitoring.MetricType.DELETE_COMMIT_TIME;
+import static org.apache.phoenix.monitoring.MetricType.FS_READ_TIME;
 import static org.apache.phoenix.monitoring.MetricType.MEMORY_CHUNK_BYTES;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_BATCH_COUNTER;
 import static org.apache.phoenix.monitoring.MetricType.MUTATION_COMMIT_TIME;
@@ -55,6 +60,8 @@ import static 
org.apache.phoenix.monitoring.MetricType.QUERY_COMPILER_TIME_MS;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_OPTIMIZER_TIME_MS;
 import static 
org.apache.phoenix.monitoring.MetricType.QUERY_RESULT_ITR_TIME_MS;
 import static org.apache.phoenix.monitoring.MetricType.QUERY_TIMEOUT_COUNTER;
+import static 
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_PROCESSING_TIME;
+import static 
org.apache.phoenix.monitoring.MetricType.RPC_SCAN_QUEUE_WAIT_TIME;
 import static 
org.apache.phoenix.monitoring.MetricType.SQL_QUERY_PARSING_TIME_MS;
 import static org.apache.phoenix.monitoring.MetricType.TASK_END_TO_END_TIME;
 import static org.apache.phoenix.monitoring.MetricType.TASK_EXECUTED_COUNTER;
@@ -137,7 +144,9 @@ public class PhoenixMetricsIT extends BasePhoenixMetricsIT {
     Lists.newArrayList(MUTATION_COMMIT_TIME, UPSERT_COMMIT_TIME, 
DELETE_COMMIT_TIME,
       MUTATION_BATCH_COUNTER, SQL_QUERY_PARSING_TIME_MS);
   private static final List<MetricType> readMetricsToSkip = 
Lists.newArrayList(TASK_QUEUE_WAIT_TIME,
-    TASK_EXECUTION_TIME, TASK_END_TO_END_TIME, COUNT_MILLS_BETWEEN_NEXTS);
+    TASK_EXECUTION_TIME, TASK_END_TO_END_TIME, COUNT_MILLS_BETWEEN_NEXTS, 
RPC_SCAN_PROCESSING_TIME,
+    RPC_SCAN_QUEUE_WAIT_TIME, FS_READ_TIME, BYTES_READ_FROM_FS, 
BYTES_READ_FROM_MEMSTORE,
+    BYTES_READ_FROM_BLOCKCACHE, BLOCK_READ_OPS_COUNT);
   private static final String CUSTOM_URL_STRING = "SESSION";
   private static final AtomicInteger numConnections = new AtomicInteger(0);
   static final String POINT_LOOKUP_SELECT_QUERY = "SELECT J, G, E, (NOW() - 
I)*24*60*60*1000 FROM"
diff --git 
a/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
 
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..93dcc0cf93
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+public class CompatScanMetrics {
+  private CompatScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static boolean supportsFineGrainedReadMetrics() {
+    return false;
+  }
+
+  public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getFsReadTime(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics) 
{
+    return 0L;
+  }
+
+  public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+}
diff --git 
a/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
 
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..89151d1fe3
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.5.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+public class CompatThreadLocalServerSideScanMetrics {
+  private CompatThreadLocalServerSideScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static void addFsReadTime(long fsReadTimeInMs) {
+  }
+
+  public static void addBytesReadFromFs(long bytesReadFromFS) {
+  }
+
+  public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+  }
+
+  public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+  }
+
+  public static void addBlockReadOpsCount(long blockReadOpsCount) {
+  }
+}
diff --git 
a/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
 
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..93dcc0cf93
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+public class CompatScanMetrics {
+  private CompatScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static boolean supportsFineGrainedReadMetrics() {
+    return false;
+  }
+
+  public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getFsReadTime(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics) 
{
+    return 0L;
+  }
+
+  public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+}
diff --git 
a/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
 
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..89151d1fe3
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.5.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+public class CompatThreadLocalServerSideScanMetrics {
+  private CompatThreadLocalServerSideScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static void addFsReadTime(long fsReadTimeInMs) {
+  }
+
+  public static void addBytesReadFromFs(long bytesReadFromFS) {
+  }
+
+  public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+  }
+
+  public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+  }
+
+  public static void addBlockReadOpsCount(long blockReadOpsCount) {
+  }
+}
diff --git 
a/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
 
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..93dcc0cf93
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import java.util.Map;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+
+public class CompatScanMetrics {
+  private CompatScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static boolean supportsFineGrainedReadMetrics() {
+    return false;
+  }
+
+  public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getFsReadTime(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics) 
{
+    return 0L;
+  }
+
+  public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+
+  public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+    return 0L;
+  }
+}
diff --git 
a/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
 
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..89151d1fe3
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.6.0/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+public class CompatThreadLocalServerSideScanMetrics {
+  private CompatThreadLocalServerSideScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static void addFsReadTime(long fsReadTimeInMs) {
+  }
+
+  public static void addBytesReadFromFs(long bytesReadFromFS) {
+  }
+
+  public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+  }
+
+  public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+  }
+
+  public static void addBlockReadOpsCount(long blockReadOpsCount) {
+  }
+}
diff --git 
a/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
 
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
new file mode 100644
index 0000000000..4026ceed05
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatScanMetrics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
+
+public class CompatScanMetrics {
+
+  private CompatScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static boolean supportsFineGrainedReadMetrics() {
+    return true;
+  }
+
+  public static Long getFsReadTime(Map<String, Long> scanMetrics) {
+    return 
scanMetrics.getOrDefault(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME, 0L);
+  }
+
+  public static Long getFsReadTime(ScanMetrics scanMetrics) {
+    return getCounterValue(scanMetrics, FS_READ_TIME_METRIC_NAME);
+  }
+
+  public static Long getBytesReadFromFs(Map<String, Long> scanMetrics) {
+    return 
scanMetrics.getOrDefault(ServerSideScanMetrics.BYTES_READ_FROM_FS_METRIC_NAME, 
0L);
+  }
+
+  public static Long getBytesReadFromFs(ScanMetrics scanMetrics) {
+    return getCounterValue(scanMetrics, BYTES_READ_FROM_FS_METRIC_NAME);
+  }
+
+  public static Long getBytesReadFromMemstore(Map<String, Long> scanMetrics) {
+    return 
scanMetrics.getOrDefault(ServerSideScanMetrics.BYTES_READ_FROM_MEMSTORE_METRIC_NAME,
 0L);
+  }
+
+  public static Long getBytesReadFromMemstore(ScanMetrics scanMetrics) {
+    return getCounterValue(scanMetrics, BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
+  }
+
+  public static Long getBytesReadFromBlockCache(Map<String, Long> scanMetrics) 
{
+    return 
scanMetrics.getOrDefault(ServerSideScanMetrics.BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME,
+      0L);
+  }
+
+  public static Long getBytesReadFromBlockCache(ScanMetrics scanMetrics) {
+    return getCounterValue(scanMetrics, 
BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
+  }
+
+  public static Long getBlockReadOpsCount(Map<String, Long> scanMetrics) {
+    return 
scanMetrics.getOrDefault(ServerSideScanMetrics.BLOCK_READ_OPS_COUNT_METRIC_NAME,
 0L);
+  }
+
+  public static Long getBlockReadOpsCount(ScanMetrics scanMetrics) {
+    return getCounterValue(scanMetrics, BLOCK_READ_OPS_COUNT_METRIC_NAME);
+  }
+
+  public static Long getRpcScanProcessingTime(Map<String, Long> scanMetrics) {
+    return 
scanMetrics.getOrDefault(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME,
 0L);
+  }
+
+  public static Long getRpcScanQueueWaitTime(Map<String, Long> scanMetrics) {
+    return 
scanMetrics.getOrDefault(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
 0L);
+  }
+
+  private static Long getCounterValue(ScanMetrics scanMetrics, String 
metricName) {
+    AtomicLong counter = scanMetrics.getCounter(metricName);
+    return counter != null ? counter.get() : 0L;
+  }
+}
diff --git 
a/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
 
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
new file mode 100644
index 0000000000..ab3b0a11e4
--- /dev/null
+++ 
b/phoenix-hbase-compat-2.6.4/src/main/java/org/apache/phoenix/compat/hbase/CompatThreadLocalServerSideScanMetrics.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.compat.hbase;
+
+import org.apache.hadoop.hbase.monitoring.ThreadLocalServerSideScanMetrics;
+
+public class CompatThreadLocalServerSideScanMetrics {
+  private CompatThreadLocalServerSideScanMetrics() {
+    // Not to be instantiated
+  }
+
+  public static void addFsReadTime(long fsReadTimeInMs) {
+    ThreadLocalServerSideScanMetrics.addFsReadTime(fsReadTimeInMs);
+  }
+
+  public static void addBytesReadFromFs(long bytesReadFromFS) {
+    ThreadLocalServerSideScanMetrics.addBytesReadFromFs(bytesReadFromFS);
+  }
+
+  public static void addBytesReadFromMemstore(long bytesReadFromMemstore) {
+    
ThreadLocalServerSideScanMetrics.addBytesReadFromMemstore(bytesReadFromMemstore);
+  }
+
+  public static void addBytesReadFromBlockCache(long bytesReadFromBlockCache) {
+    
ThreadLocalServerSideScanMetrics.addBytesReadFromBlockCache(bytesReadFromBlockCache);
+  }
+
+  public static void addBlockReadOpsCount(long blockReadOpsCount) {
+    ThreadLocalServerSideScanMetrics.addBlockReadOpsCount(blockReadOpsCount);
+  }
+}

Reply via email to