This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 08fe1d3c7f7 HBASE-29233: Capture scan metrics at region level (#7132) 
(#6868)
08fe1d3c7f7 is described below

commit 08fe1d3c7f76cca9c45b4782442d425b6853c81f
Author: sanjeet006py <[email protected]>
AuthorDate: Thu Jul 3 00:40:40 2025 +0530

    HBASE-29233: Capture scan metrics at region level (#7132) (#6868)
    
    Signed-off-by: Viraj Jasani <[email protected]>
---
 .gitignore                                         |   1 +
 .../hadoop/hbase/client/AbstractClientScanner.java |  12 +
 .../hadoop/hbase/client/AsyncClientScanner.java    |  15 +
 .../apache/hadoop/hbase/client/ClientScanner.java  |  22 +-
 .../hadoop/hbase/client/ConnectionUtils.java       |  14 +-
 .../apache/hadoop/hbase/client/ImmutableScan.java  |  11 +
 .../java/org/apache/hadoop/hbase/client/Scan.java  |  26 +-
 .../hadoop/hbase/client/ScannerCallable.java       |   2 +-
 .../hbase/client/ScannerCallableWithReplicas.java  |   5 +
 .../client/metrics/RegionScanMetricsData.java      |  80 +++
 .../hadoop/hbase/client/metrics/ScanMetrics.java   |  16 +-
 .../client/metrics/ScanMetricsRegionInfo.java      |  82 +++
 .../hbase/client/metrics/ScanMetricsUtil.java      |  88 +++
 .../client/metrics/ServerSideScanMetrics.java      | 121 +++-
 .../hbase/client/ClientSideRegionScanner.java      |  11 +-
 .../hadoop/hbase/client/TableSnapshotScanner.java  |   3 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   9 +-
 .../hbase/regionserver/RegionScannerImpl.java      |   7 +-
 .../hbase/client/TestAsyncTableScanMetrics.java    | 111 +++-
 ...AsyncTableScanMetricsWithScannerSuspending.java | 162 +++++
 .../hbase/client/TestClientSideRegionScanner.java  |  85 +++
 .../hadoop/hbase/client/TestReplicasClient.java    |  80 +++
 .../hadoop/hbase/client/TestScanAttributes.java    |  50 ++
 .../hadoop/hbase/client/TestTableScanMetrics.java  | 710 +++++++++++++++++++++
 .../hbase/client/TestTableSnapshotScanner.java     |  84 +++
 25 files changed, 1750 insertions(+), 57 deletions(-)

diff --git a/.gitignore b/.gitignore
index 56ed1be0b81..666771f7e25 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,3 +23,4 @@ linklint/
 .java-version
 tmp
 **/.flattened-pom.xml
+.vscode/
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 48cec12f43c..ece657deb0a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -26,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public abstract class AbstractClientScanner implements ResultScanner {
   protected ScanMetrics scanMetrics;
+  private boolean isScanMetricsByRegionEnabled = false;
 
   /**
    * Check and initialize if application wants to collect scan metrics
@@ -34,6 +35,9 @@ public abstract class AbstractClientScanner implements 
ResultScanner {
     // check if application wants to collect scan metrics
     if (scan.isScanMetricsEnabled()) {
       scanMetrics = new ScanMetrics();
+      if (scan.isScanMetricsByRegionEnabled()) {
+        isScanMetricsByRegionEnabled = true;
+      }
     }
   }
 
@@ -46,4 +50,12 @@ public abstract class AbstractClientScanner implements 
ResultScanner {
   public ScanMetrics getScanMetrics() {
     return scanMetrics;
   }
+
+  protected void setIsScanMetricsByRegionEnabled(boolean 
isScanMetricsByRegionEnabled) {
+    this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
+  }
+
+  protected boolean isScanMetricsByRegionEnabled() {
+    return isScanMetricsByRegionEnabled;
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index b61f5b80c9e..d58c8e60c8d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -95,6 +95,8 @@ class AsyncClientScanner {
 
   private final Map<String, byte[]> requestAttributes;
 
+  private final boolean isScanMetricsByRegionEnabled;
+
   public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, 
TableName tableName,
     AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long 
pauseNsForServerOverloaded,
     int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int 
startLogErrorsCnt,
@@ -118,12 +120,17 @@ class AsyncClientScanner {
     this.startLogErrorsCnt = startLogErrorsCnt;
     this.resultCache = createScanResultCache(scan);
     this.requestAttributes = requestAttributes;
+    boolean isScanMetricsByRegionEnabled = false;
     if (scan.isScanMetricsEnabled()) {
       this.scanMetrics = new ScanMetrics();
       consumer.onScanMetricsCreated(scanMetrics);
+      if (this.scan.isScanMetricsByRegionEnabled()) {
+        isScanMetricsByRegionEnabled = true;
+      }
     } else {
       this.scanMetrics = null;
     }
+    this.isScanMetricsByRegionEnabled = isScanMetricsByRegionEnabled;
 
     /*
      * Assumes that the `start()` method is called immediately after 
construction. If this is no
@@ -250,6 +257,9 @@ class AsyncClientScanner {
   }
 
   private void openScanner() {
+    if (this.isScanMetricsByRegionEnabled) {
+      scanMetrics.moveToNextRegion();
+    }
     incRegionCountMetrics(scanMetrics);
     openScannerTries.set(1);
     addListener(timelineConsistentRead(conn.getLocator(), tableName, scan, 
scan.getStartRow(),
@@ -265,6 +275,11 @@ class AsyncClientScanner {
               span.end();
             }
           }
+          if (this.isScanMetricsByRegionEnabled) {
+            HRegionLocation loc = resp.loc;
+            
this.scanMetrics.initScanMetricsRegionInfo(loc.getRegion().getEncodedName(),
+              loc.getServerName());
+          }
           startScan(resp);
         }
       });
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index df7f900830e..a453945e729 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -34,10 +34,12 @@ import org.apache.commons.lang3.mutable.MutableBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
 import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -135,6 +137,11 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     this.useScannerTimeoutForNextCalls = 
connectionConfiguration.isUseScannerTimeoutForNextCalls();
     this.requestAttributes = requestAttributes;
 
+    if (scan.isScanMetricsByRegionEnabled() && scan.getConsistency() == 
Consistency.TIMELINE) {
+      scan.setEnableScanMetricsByRegion(false);
+      scanForMetrics.setEnableScanMetricsByRegion(false);
+      LOG.warn("Scan metrics by region is not supported for timeline 
consistency in HBase 2");
+    }
     // check if application wants to collect scan metrics
     initScanMetrics(scan);
 
@@ -259,6 +266,9 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     // clear the current region, we will set a new value to it after the first 
call of the new
     // callable.
     this.currentRegion = null;
+    if (isScanMetricsByRegionEnabled()) {
+      scanMetrics.moveToNextRegion();
+    }
     this.callable = new ScannerCallableWithReplicas(getTable(), 
getConnection(),
       createScannerCallable(), pool, primaryOperationTimeout, scan, 
getRetries(), readRpcTimeout,
       scannerTimeout, useScannerTimeoutForNextCalls, caching, conf, caller);
@@ -281,6 +291,7 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
     Result[] rrs = caller.callWithoutRetries(callable, scannerTimeout);
     if (currentRegion == null && updateCurrentRegion) {
       currentRegion = callable.getHRegionInfo();
+      initScanMetricsRegionInfo();
     }
     return rrs;
   }
@@ -469,7 +480,8 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
       }
       long currentTime = EnvironmentEdgeManager.currentTime();
       if (this.scanMetrics != null) {
-        this.scanMetrics.sumOfMillisSecBetweenNexts.addAndGet(currentTime - 
lastNext);
+        
this.scanMetrics.addToCounter(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME,
+          currentTime - lastNext);
       }
       lastNext = currentTime;
       // Groom the array of Results that we received back from the server 
before adding that
@@ -622,4 +634,12 @@ public abstract class ClientScanner extends 
AbstractClientScanner {
       return nextWithSyncCache();
     }
   }
+
+  private void initScanMetricsRegionInfo() {
+    if (isScanMetricsByRegionEnabled()) {
+      HRegionLocation location = callable.getLocation();
+      String encodedRegionName = location.getRegion().getEncodedName();
+      scanMetrics.initScanMetricsRegionInfo(encodedRegionName, 
location.getServerName());
+    }
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 5059cc273b9..8b8a1cc4034 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -367,9 +367,9 @@ public final class ConnectionUtils {
     if (scanMetrics == null) {
       return;
     }
-    scanMetrics.countOfRPCcalls.incrementAndGet();
+    scanMetrics.addToCounter(ScanMetrics.RPC_CALLS_METRIC_NAME, 1);
     if (isRegionServerRemote) {
-      scanMetrics.countOfRemoteRPCcalls.incrementAndGet();
+      scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME, 1);
     }
   }
 
@@ -377,9 +377,9 @@ public final class ConnectionUtils {
     if (scanMetrics == null) {
       return;
     }
-    scanMetrics.countOfRPCRetries.incrementAndGet();
+    scanMetrics.addToCounter(ScanMetrics.RPC_RETRIES_METRIC_NAME, 1);
     if (isRegionServerRemote) {
-      scanMetrics.countOfRemoteRPCRetries.incrementAndGet();
+      scanMetrics.addToCounter(ScanMetrics.REMOTE_RPC_RETRIES_METRIC_NAME, 1);
     }
   }
 
@@ -394,9 +394,9 @@ public final class ConnectionUtils {
         resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
       }
     }
-    scanMetrics.countOfBytesInResults.addAndGet(resultSize);
+    scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, 
resultSize);
     if (isRegionServerRemote) {
-      scanMetrics.countOfBytesInRemoteResults.addAndGet(resultSize);
+      
scanMetrics.addToCounter(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME, 
resultSize);
     }
   }
 
@@ -416,7 +416,7 @@ public final class ConnectionUtils {
     if (scanMetrics == null) {
       return;
     }
-    scanMetrics.countOfRegions.incrementAndGet();
+    scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
   }
 
   /**
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
index b54541e60f4..ace1f2bf4d0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ImmutableScan.java
@@ -240,6 +240,12 @@ public final class ImmutableScan extends Scan {
       "ImmutableScan does not allow access to setScanMetricsEnabled");
   }
 
+  @Override
+  public Scan setEnableScanMetricsByRegion(final boolean enable) {
+    throw new UnsupportedOperationException(
+      "ImmutableScan does not allow access to setEnableScanMetricsByRegion");
+  }
+
   @Override
   @Deprecated
   public Scan setAsyncPrefetch(boolean asyncPrefetch) {
@@ -420,6 +426,11 @@ public final class ImmutableScan extends Scan {
     return this.delegateScan.isScanMetricsEnabled();
   }
 
+  @Override
+  public boolean isScanMetricsByRegionEnabled() {
+    return this.delegateScan.isScanMetricsByRegionEnabled();
+  }
+
   @Override
   public Boolean isAsyncPrefetch() {
     return this.delegateScan.isAsyncPrefetch();
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 81186ce656e..c37ee1e35a5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -130,6 +130,8 @@ public class Scan extends Query {
   // define this attribute with the appropriate table name by calling
   // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, 
Bytes.toBytes(tableName))
   static public final String SCAN_ATTRIBUTES_TABLE_NAME = 
"scan.attributes.table.name";
+  static private final String SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE =
+    "scan.attributes.metrics.byregion.enable";
 
   /**
    * -1 means no caching specified and the value of {@link 
HConstants#HBASE_CLIENT_SCANNER_CACHING}
@@ -1102,11 +1104,15 @@ public class Scan extends Query {
   }
 
   /**
-   * Enable collection of {@link ScanMetrics}. For advanced users.
+   * Enable collection of {@link ScanMetrics}. For advanced users. While 
disabling scan metrics,
+   * will also disable region level scan metrics.
    * @param enabled Set to true to enable accumulating scan metrics
    */
   public Scan setScanMetricsEnabled(final boolean enabled) {
     setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE, 
Bytes.toBytes(Boolean.valueOf(enabled)));
+    if (!enabled) {
+      setEnableScanMetricsByRegion(false);
+    }
     return this;
   }
 
@@ -1239,4 +1245,22 @@ public class Scan extends Query {
   public static Scan createScanFromCursor(Cursor cursor) {
     return new Scan().withStartRow(cursor.getRow());
   }
+
+  /**
+   * Enables region level scan metrics. If scan metrics are disabled then 
first enables scan metrics
+   * followed by region level scan metrics.
+   * @param enable Set to true to enable region level scan metrics.
+   */
+  public Scan setEnableScanMetricsByRegion(final boolean enable) {
+    if (enable) {
+      setScanMetricsEnabled(true);
+    }
+    setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE, 
Bytes.toBytes(enable));
+    return this;
+  }
+
+  public boolean isScanMetricsByRegionEnabled() {
+    byte[] attr = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_BY_REGION_ENABLE);
+    return attr != null && Bytes.toBoolean(attr);
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 883a1e06185..8ba8df0c6a8 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -228,7 +228,7 @@ public class ScannerCallable extends 
ClientServiceCallable<Result[]> {
         // when what we need is to open scanner against new location.
         // Attach NSRE to signal client that it needs to re-setup scanner.
         if (this.scanMetrics != null) {
-          this.scanMetrics.countOfNSRE.incrementAndGet();
+          
this.scanMetrics.addToCounter(ScanMetrics.NOT_SERVING_REGION_EXCEPTION_METRIC_NAME,
 1);
         }
         throw new DoNotRetryIOException("Resetting the scanner -- see 
exception cause", ioe);
       } else if (ioe instanceof RegionServerStoppedException) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 5261ff4af5c..242b9031a4e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ScannerCallable.MoreResults;
@@ -498,4 +499,8 @@ class ScannerCallableWithReplicas implements 
RetryingCallable<Result[]> {
   public long sleep(long pause, int tries) {
     return currentScannerCallable.sleep(pause, tries);
   }
+
+  public HRegionLocation getLocation() {
+    return currentScannerCallable.getLocation();
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
new file mode 100644
index 00000000000..adece500167
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/RegionScanMetricsData.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Captures region level scan metrics as a map of metric name ({@link String}) 
-> Value
+ * ({@link AtomicLong}). <br/>
+ * <br/>
+ * One instance stores scan metrics for a single region only.
+ */
[email protected]
+public class RegionScanMetricsData {
+  private final Map<String, AtomicLong> counters = new HashMap<>();
+  private ScanMetricsRegionInfo scanMetricsRegionInfo =
+    ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO;
+
+  AtomicLong createCounter(String counterName) {
+    return ScanMetricsUtil.createCounter(counters, counterName);
+  }
+
+  void setCounter(String counterName, long value) {
+    ScanMetricsUtil.setCounter(counters, counterName, value);
+  }
+
+  void addToCounter(String counterName, long delta) {
+    ScanMetricsUtil.addToCounter(counters, counterName, delta);
+  }
+
+  Map<String, Long> collectMetrics(boolean reset) {
+    return ScanMetricsUtil.collectMetrics(counters, reset);
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "[" + scanMetricsRegionInfo + "," + 
"Counters=" + counters
+      + "]";
+  }
+
+  /**
+   * Populate encoded region name and server name details if not already 
populated. If details are
+   * already populated and a re-attempt is done then {@link 
UnsupportedOperationException} is
+   * thrown.
+   */
+  void initScanMetricsRegionInfo(String encodedRegionName, ServerName 
serverName) {
+    // Check by reference
+    if (scanMetricsRegionInfo == 
ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO) {
+      scanMetricsRegionInfo = new ScanMetricsRegionInfo(encodedRegionName, 
serverName);
+    } else {
+      throw new UnsupportedOperationException(
+        "ScanMetricsRegionInfo has already been initialized to " + 
scanMetricsRegionInfo
+          + " and cannot be re-initialized to region: " + encodedRegionName + 
" and server: "
+          + serverName);
+    }
+  }
+
+  ScanMetricsRegionInfo getScanMetricsRegionInfo() {
+    return scanMetricsRegionInfo;
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
index e2038a17b37..8617a851509 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetrics.java
@@ -96,8 +96,22 @@ public class ScanMetrics extends ServerSideScanMetrics {
   public final AtomicLong countOfRemoteRPCRetries = 
createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
 
   /**
-   * constructor
+   * Constructor
    */
   public ScanMetrics() {
   }
+
+  @Override
+  public void moveToNextRegion() {
+    super.moveToNextRegion();
+    currentRegionScanMetricsData.createCounter(RPC_CALLS_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(REMOTE_RPC_CALLS_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(BYTES_IN_RESULTS_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(REGIONS_SCANNED_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(RPC_RETRIES_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(REMOTE_RPC_RETRIES_METRIC_NAME);
+  }
 }
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
new file mode 100644
index 00000000000..58b5b0a80db
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsRegionInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * POJO for capturing region level details when region level scan metrics are 
enabled. <br>
+ * <br>
+ * Currently, encoded region name and server name (host name, ports and 
startcode) are captured as
+ * region details. <br>
+ * <br>
+ * Instance of this class serves as key in the Map returned by
+ * {@link ServerSideScanMetrics#collectMetricsByRegion()} or
+ * {@link ServerSideScanMetrics#collectMetricsByRegion(boolean)}.
+ */
[email protected]
[email protected]
+public class ScanMetricsRegionInfo {
+  /**
+   * Users should only compare against this constant by reference and should 
not make any
+   * assumptions regarding content of the constant.
+   */
+  public static final ScanMetricsRegionInfo EMPTY_SCAN_METRICS_REGION_INFO =
+    new ScanMetricsRegionInfo(null, null);
+
+  private final String encodedRegionName;
+  private final ServerName serverName;
+
+  ScanMetricsRegionInfo(String encodedRegionName, ServerName serverName) {
+    this.encodedRegionName = encodedRegionName;
+    this.serverName = serverName;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof ScanMetricsRegionInfo)) {
+      return false;
+    }
+    ScanMetricsRegionInfo other = (ScanMetricsRegionInfo) obj;
+    return new EqualsBuilder().append(encodedRegionName, 
other.encodedRegionName)
+      .append(serverName, other.serverName).isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 
37).append(encodedRegionName).append(serverName).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return getClass().getSimpleName() + "[encodedRegionName=" + 
encodedRegionName + ",serverName="
+      + serverName + "]";
+  }
+
+  public String getEncodedRegionName() {
+    return encodedRegionName;
+  }
+
+  public ServerName getServerName() {
+    return serverName;
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
new file mode 100644
index 00000000000..2e1dfb8a3c4
--- /dev/null
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ScanMetricsUtil.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.yetus.audience.InterfaceAudience;
+
[email protected]
+public final class ScanMetricsUtil {
+
+  private ScanMetricsUtil() {
+  }
+
+  /**
+   * Creates a new counter with the specified name and stores it in the 
counters map.
+   * @return {@link AtomicLong} instance for the counter with counterName
+   */
+  static AtomicLong createCounter(Map<String, AtomicLong> counters, String 
counterName) {
+    AtomicLong c = new AtomicLong(0);
+    counters.put(counterName, c);
+    return c;
+  }
+
+  /**
+   * Sets counter with counterName to passed in value, does nothing if counter 
does not exist.
+   */
+  static void setCounter(Map<String, AtomicLong> counters, String counterName, 
long value) {
+    AtomicLong c = counters.get(counterName);
+    if (c != null) {
+      c.set(value);
+    }
+  }
+
+  /**
+   * Increments the counter with counterName by delta, does nothing if counter 
does not exist.
+   */
+  static void addToCounter(Map<String, AtomicLong> counters, String 
counterName, long delta) {
+    AtomicLong c = counters.get(counterName);
+    if (c != null) {
+      c.addAndGet(delta);
+    }
+  }
+
+  /**
+   * Returns true if a counter exists with the counterName.
+   */
+  static boolean hasCounter(Map<String, AtomicLong> counters, String 
counterName) {
+    return counters.containsKey(counterName);
+  }
+
+  /**
+   * Returns {@link AtomicLong} instance for this counter name, null if 
counter does not exist.
+   */
+  static AtomicLong getCounter(Map<String, AtomicLong> counters, String 
counterName) {
+    return counters.get(counterName);
+  }
+
+  /**
+   * Get all the values. If reset is true, we will reset the all AtomicLongs 
back to 0.
+   * @param reset whether to reset the AtomicLongs to 0.
+   * @return A Map of String -> Long for metrics
+   */
+  static Map<String, Long> collectMetrics(Map<String, AtomicLong> counters, 
boolean reset) {
+    Map<String, Long> metricsSnapshot = new HashMap<>();
+    for (Map.Entry<String, AtomicLong> e : counters.entrySet()) {
+      long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
+      metricsSnapshot.put(e.getKey(), value);
+    }
+    return metricsSnapshot;
+  }
+}
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index 93fd8890d33..916451df75d 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -17,9 +17,13 @@
  */
 package org.apache.hadoop.hbase.client.metrics;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -31,18 +35,31 @@ import 
org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 @SuppressWarnings("checkstyle:VisibilityModifier") // See HBASE-27757
 public class ServerSideScanMetrics {
   /**
-   * Hash to hold the String -&gt; Atomic Long mappings for each metric
+   * Hash to hold the String -> Atomic Long mappings for each metric
    */
   private final Map<String, AtomicLong> counters = new HashMap<>();
+  private final List<RegionScanMetricsData> regionScanMetricsData = new 
ArrayList<>(0);
+  protected RegionScanMetricsData currentRegionScanMetricsData = null;
 
   /**
-   * Create a new counter with the specified name
+   * If region level scan metrics are enabled, must call this method to start 
collecting metrics for
+   * the region before scanning the region.
+   */
+  public void moveToNextRegion() {
+    currentRegionScanMetricsData = new RegionScanMetricsData();
+    regionScanMetricsData.add(currentRegionScanMetricsData);
+    
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(BLOCK_BYTES_SCANNED_KEY_METRIC_NAME);
+    currentRegionScanMetricsData.createCounter(FS_READ_TIME_METRIC_NAME);
+  }
+
+  /**
+   * Create a new counter with the specified name.
    * @return {@link AtomicLong} instance for the counter with counterName
    */
   protected AtomicLong createCounter(String counterName) {
-    AtomicLong c = new AtomicLong(0);
-    counters.put(counterName, c);
-    return c;
+    return ScanMetricsUtil.createCounter(counters, counterName);
   }
 
   public static final String COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME = 
"ROWS_SCANNED";
@@ -85,52 +102,106 @@ public class ServerSideScanMetrics {
 
   public final AtomicLong fsReadTime = createCounter(FS_READ_TIME_METRIC_NAME);
 
+  /**
+   * Sets counter with counterName to passed in value, does nothing if counter 
does not exist. If
+   * region level scan metrics are enabled then sets the value of counter for 
the current region
+   * being scanned.
+   */
   public void setCounter(String counterName, long value) {
-    AtomicLong c = this.counters.get(counterName);
-    if (c != null) {
-      c.set(value);
+    ScanMetricsUtil.setCounter(counters, counterName, value);
+    if (this.currentRegionScanMetricsData != null) {
+      this.currentRegionScanMetricsData.setCounter(counterName, value);
     }
   }
 
-  /** Returns true if a counter exists with the counterName */
+  /**
+   * Returns true if a counter exists with the counterName.
+   */
   public boolean hasCounter(String counterName) {
-    return this.counters.containsKey(counterName);
+    return ScanMetricsUtil.hasCounter(counters, counterName);
   }
 
-  /** Returns {@link AtomicLong} instance for this counter name, null if 
counter does not exist. */
+  /**
+   * Returns {@link AtomicLong} instance for this counter name, null if 
counter does not exist.
+   */
   public AtomicLong getCounter(String counterName) {
-    return this.counters.get(counterName);
+    return ScanMetricsUtil.getCounter(counters, counterName);
   }
 
+  /**
+   * Increments the counter with counterName by delta, does nothing if counter 
does not exist. If
+   * region level scan metrics are enabled then increments the counter 
corresponding to the current
+   * region being scanned. Please see {@link #moveToNextRegion()}.
+   */
   public void addToCounter(String counterName, long delta) {
-    AtomicLong c = this.counters.get(counterName);
-    if (c != null) {
-      c.addAndGet(delta);
+    ScanMetricsUtil.addToCounter(counters, counterName, delta);
+    if (this.currentRegionScanMetricsData != null) {
+      this.currentRegionScanMetricsData.addToCounter(counterName, delta);
     }
   }
 
   /**
-   * Get all of the values since the last time this function was called. 
Calling this function will
-   * reset all AtomicLongs in the instance back to 0.
-   * @return A Map of String -&gt; Long for metrics
+   * Get all the values combined for all the regions since the last time this 
function was called.
+   * Calling this function will reset all AtomicLongs in the instance back to 
0.
+   * @return A Map of String -> Long for metrics
    */
   public Map<String, Long> getMetricsMap() {
     return getMetricsMap(true);
   }
 
   /**
-   * Get all of the values. If reset is true, we will reset the all 
AtomicLongs back to 0.
+   * Get all the values combined for all the regions. If reset is true, we 
will reset all the
+   * AtomicLongs back to 0.
    * @param reset whether to reset the AtomicLongs to 0.
-   * @return A Map of String -&gt; Long for metrics
+   * @return A Map of String -> Long for metrics
    */
   public Map<String, Long> getMetricsMap(boolean reset) {
+    return ImmutableMap.copyOf(ScanMetricsUtil.collectMetrics(counters, 
reset));
+  }
+
+  /**
+   * Get values grouped by each region scanned since the last time this was 
called. Calling this
+   * function will reset all region level scan metrics counters back to 0.
+   * @return A Map of region -> (Map of metric name -> Long) for metrics
+   */
+  public Map<ScanMetricsRegionInfo, Map<String, Long>> 
collectMetricsByRegion() {
+    return collectMetricsByRegion(true);
+  }
+
+  /**
+   * Get values grouped by each region scanned. If reset is true, will reset 
all the region level
+   * scan metrics counters back to 0.
+   * @param reset whether to reset region level scan metric counters to 0.
+   * @return A Map of region -> (Map of metric name -> Long) for metrics
+   */
+  public Map<ScanMetricsRegionInfo, Map<String, Long>> 
collectMetricsByRegion(boolean reset) {
     // Create a builder
-    ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
-    for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
-      long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
-      builder.put(e.getKey(), value);
+    ImmutableMap.Builder<ScanMetricsRegionInfo, Map<String, Long>> builder = 
ImmutableMap.builder();
+    for (RegionScanMetricsData regionScanMetricsData : 
this.regionScanMetricsData) {
+      if (
+        regionScanMetricsData.getScanMetricsRegionInfo()
+            == ScanMetricsRegionInfo.EMPTY_SCAN_METRICS_REGION_INFO
+      ) {
+        continue;
+      }
+      builder.put(regionScanMetricsData.getScanMetricsRegionInfo(),
+        regionScanMetricsData.collectMetrics(reset));
     }
-    // Build the immutable map so that people can't mess around with it.
     return builder.build();
   }
+
+  @Override
+  public String toString() {
+    return counters + "," + 
regionScanMetricsData.stream().map(RegionScanMetricsData::toString)
+      .collect(Collectors.joining(","));
+  }
+
+  /**
+   * Call this method after calling {@link #moveToNextRegion()} to populate 
server name and encoded
+   * region name details for the region being scanned and for which metrics 
are being collected at
+   * the moment.
+   */
+  public void initScanMetricsRegionInfo(String encodedRegionName, ServerName 
serverName) {
+    currentRegionScanMetricsData.initScanMetricsRegionInfo(encodedRegionName, 
serverName);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 144c01de874..df99fd40338 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
 import org.apache.hadoop.hbase.mob.MobFileCache;
@@ -90,6 +91,12 @@ public class ClientSideRegionScanner extends 
AbstractClientScanner {
       initScanMetrics(scan);
     } else {
       this.scanMetrics = scanMetrics;
+      setIsScanMetricsByRegionEnabled(scan.isScanMetricsByRegionEnabled());
+    }
+    if (isScanMetricsByRegionEnabled()) {
+      this.scanMetrics.moveToNextRegion();
+      
this.scanMetrics.initScanMetricsRegionInfo(region.getRegionInfo().getEncodedName(),
 null);
+      // The server name will be null in scan metrics as this is a client side 
region scanner
     }
     region.startRegionOperation();
   }
@@ -110,8 +117,8 @@ public class ClientSideRegionScanner extends 
AbstractClientScanner {
       for (Cell cell : values) {
         resultSize += PrivateCellUtil.estimatedSerializedSizeOf(cell);
       }
-      this.scanMetrics.countOfBytesInResults.addAndGet(resultSize);
-      this.scanMetrics.countOfRowsScanned.incrementAndGet();
+      this.scanMetrics.addToCounter(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME, 
resultSize);
+      
this.scanMetrics.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME,
 1);
     }
 
     return result;
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
index 872c9a163c6..9e01e34877c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/TableSnapshotScanner.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.regionserver.MemStoreLAB;
 import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -187,7 +188,7 @@ public class TableSnapshotScanner extends 
AbstractClientScanner {
         currentRegionScanner =
           new ClientSideRegionScanner(conf, fs, restoreDir, htd, hri, scan, 
scanMetrics);
         if (this.scanMetrics != null) {
-          this.scanMetrics.countOfRegions.incrementAndGet();
+          
this.scanMetrics.addToCounter(ScanMetrics.REGIONS_SCANNED_METRIC_NAME, 1);
         }
       }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 26006676596..00ba1233424 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
@@ -3567,10 +3568,12 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler, AdminService.Blockin
         if (trackMetrics) {
           // rather than increment yet another counter in StoreScanner, just 
set the value here
           // from block size progress before writing into the response
-          scannerContext.getMetrics().countOfBlockBytesScanned
-            .set(scannerContext.getBlockSizeProgress());
+          scannerContext.getMetrics().setCounter(
+            ServerSideScanMetrics.BLOCK_BYTES_SCANNED_KEY_METRIC_NAME,
+            scannerContext.getBlockSizeProgress());
           if (rpcCall != null) {
-            
scannerContext.getMetrics().fsReadTime.set(rpcCall.getFsReadTime());
+            
scannerContext.getMetrics().setCounter(ServerSideScanMetrics.FS_READ_TIME_METRIC_NAME,
+              rpcCall.getFsReadTime());
           }
           Map<String, Long> metrics = 
scannerContext.getMetrics().getMetricsMap();
           ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index 962c0f68406..0c0679082cc 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.IsolationLevel;
 import org.apache.hadoop.hbase.client.PackagePrivateFieldAccessor;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.filter.FilterWrapper;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.ipc.CallerDisconnectedException;
@@ -644,7 +645,8 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
       return;
     }
 
-    scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet();
+    scannerContext.getMetrics()
+      
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, 1);
   }
 
   private void incrementCountOfRowsScannedMetric(ScannerContext 
scannerContext) {
@@ -652,7 +654,8 @@ public class RegionScannerImpl implements RegionScanner, 
Shipper, RpcCallback {
       return;
     }
 
-    scannerContext.getMetrics().countOfRowsScanned.incrementAndGet();
+    scannerContext.getMetrics()
+      
.addToCounter(ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, 1);
   }
 
   /** Returns true when the joined heap may have data for the current row */
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
index aebdb5f3a13..f9209c246d1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetrics.java
@@ -17,19 +17,28 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ForkJoinPool;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -93,12 +102,12 @@ public class TestAsyncTableScanMetrics {
   @BeforeClass
   public static void setUp() throws Exception {
     UTIL.startMiniCluster(3);
-    // Create 3 rows in the table, with rowkeys starting with "zzz*" so that
-    // scan are forced to hit all the regions.
+    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" 
and "zzz*" so that
+    // scan hits all the region and not all rows lie in a single region
     try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
-      table.put(Arrays.asList(new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, 
VALUE),
-        new Put(Bytes.toBytes("zzz2")).addColumn(CF, CQ, VALUE),
-        new Put(Bytes.toBytes("zzz3")).addColumn(CF, CQ, VALUE)));
+      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, 
VALUE),
+        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
     }
     CONN = 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
     NUM_REGIONS = UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
@@ -141,25 +150,101 @@ public class TestAsyncTableScanMetrics {
   }
 
   @Test
-  public void testNoScanMetrics() throws Exception {
+  public void testScanMetricsDisabled() throws Exception {
     Pair<List<Result>, ScanMetrics> pair = method.scan(new Scan());
     assertEquals(3, pair.getFirst().size());
+    // Assert no scan metrics
     assertNull(pair.getSecond());
   }
 
   @Test
-  public void testScanMetrics() throws Exception {
-    Pair<List<Result>, ScanMetrics> pair = method.scan(new 
Scan().setScanMetricsEnabled(true));
+  public void testScanMetricsWithScanMetricsByRegionDisabled() throws 
Exception {
+    Scan scan = new Scan();
+    scan.setScanMetricsEnabled(true);
+    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
     List<Result> results = pair.getFirst();
     assertEquals(3, results.size());
-    long bytes = results.stream().flatMap(r -> 
Arrays.asList(r.rawCells()).stream())
-      .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
+    long bytes = getBytesOfResults(results);
+    ScanMetrics scanMetrics = pair.getSecond();
+    assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+    assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
+    // Assert scan metrics have not been collected by region
+    assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+  }
+
+  @Test
+  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+    Scan scan = new Scan();
+    scan.withStartRow(Bytes.toBytes("zzz1"), true);
+    scan.withStopRow(Bytes.toBytes("zzz1"), true);
+    scan.setEnableScanMetricsByRegion(true);
+    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+    List<Result> results = pair.getFirst();
+    assertEquals(1, results.size());
+    long bytes = getBytesOfResults(results);
+    ScanMetrics scanMetrics = pair.getSecond();
+    assertEquals(1, scanMetrics.countOfRegions.get());
+    assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+    assertEquals(1, scanMetrics.countOfRPCcalls.get());
+    // Assert scan metrics by region were collected for the region scanned
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    assertEquals(1, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo smri = entry.getKey();
+      Map<String, Long> metrics = entry.getValue();
+      assertNotNull(smri.getServerName());
+      assertNotNull(smri.getEncodedRegionName());
+      // Assert overall scan metrics and scan metrics by region should be 
equal as only 1 region
+      // was scanned.
+      assertEquals(scanMetrics.getMetricsMap(false), metrics);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+    Scan scan = new Scan();
+    scan.setEnableScanMetricsByRegion(true);
+    Pair<List<Result>, ScanMetrics> pair = method.scan(scan);
+    List<Result> results = pair.getFirst();
+    assertEquals(3, results.size());
+    long bytes = getBytesOfResults(results);
     ScanMetrics scanMetrics = pair.getSecond();
+    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+    assertEquals(NUM_REGIONS, (long) 
overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
     assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+    assertEquals(bytes, (long) 
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
     assertEquals(bytes, scanMetrics.countOfBytesInResults.get());
+    assertEquals(NUM_REGIONS, (long) 
overallMetrics.get(RPC_CALLS_METRIC_NAME));
     assertEquals(NUM_REGIONS, scanMetrics.countOfRPCcalls.get());
-    // also assert a server side metric to ensure that we have published them 
into the client side
-    // metrics.
-    assertEquals(3, scanMetrics.countOfRowsScanned.get());
+    // Assert scan metrics by region were collected for the region scanned
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+    int rowsScannedAcrossAllRegions = 0;
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo smri = entry.getKey();
+      Map<String, Long> perRegionMetrics = entry.getValue();
+      assertNotNull(smri.getServerName());
+      assertNotNull(smri.getEncodedRegionName());
+      assertEquals(1, (long) 
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+      if (perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+        bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+        assertEquals(bytes, (long) 
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+        rowsScannedAcrossAllRegions++;
+      } else {
+        assertEquals(0, (long) 
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+        assertEquals(0, (long) 
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+      }
+    }
+    assertEquals(3, rowsScannedAcrossAllRegions);
+  }
+
+  static long getBytesOfResults(List<Result> results) {
+    return results.stream().flatMap(r -> Arrays.asList(r.rawCells()).stream())
+      .mapToLong(c -> PrivateCellUtil.estimatedSerializedSizeOf(c)).sum();
   }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
new file mode 100644
index 00000000000..53eaccaec0e
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanMetricsWithScannerSuspending.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static 
org.apache.hadoop.hbase.client.TestAsyncTableScanMetrics.getBytesOfResults;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_CALLS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanMetricsWithScannerSuspending {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    
HBaseClassTestRule.forClass(TestAsyncTableScanMetricsWithScannerSuspending.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME =
+    
TableName.valueOf(TestAsyncTableScanMetricsWithScannerSuspending.class.getSimpleName());
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static AsyncConnection CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(1);
+    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" 
and "zzz*" so that
+    // scan hits all the region and not all rows lie in a single region
+    try (Table table = UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, 
VALUE),
+        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+    }
+    CONN = 
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    Closeables.close(CONN, true);
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithScannerSuspending() throws Exception {
+    // Setup scan
+    Scan scan = new Scan();
+    scan.withStartRow(Bytes.toBytes("xxx1"), true);
+    scan.withStopRow(Bytes.toBytes("zzz1"), true);
+    scan.setEnableScanMetricsByRegion(true);
+    scan.setMaxResultSize(1);
+
+    // Prepare scanner
+    final AtomicInteger rowsReadCounter = new AtomicInteger(0);
+    AsyncTableResultScanner scanner = new AsyncTableResultScanner(TABLE_NAME, 
scan, 1) {
+      @Override
+      public void onNext(Result[] results, ScanController controller) {
+        rowsReadCounter.addAndGet(results.length);
+        super.onNext(results, controller);
+      }
+    };
+
+    // Do the scan so that rows get loaded in the scanner (consumer)
+    CONN.getTable(TABLE_NAME).scan(scan, scanner);
+
+    List<Result> results = new ArrayList<>();
+    int expectedTotalRows = 3;
+    // Assert that only 1 row has been loaded so far as maxCacheSize is set to 
1 byte
+    for (int i = 1; i <= expectedTotalRows; i++) {
+      UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() {
+
+        @Override
+        public boolean evaluate() throws Exception {
+          return scanner.isSuspended();
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "The given scanner has been suspended in time";
+        }
+      });
+      assertTrue(scanner.isSuspended());
+      assertEquals(i, rowsReadCounter.get());
+      results.add(scanner.next());
+    }
+    Assert.assertNull(scanner.next());
+
+    // Assert on overall scan metrics and scan metrics by region
+    ScanMetrics scanMetrics = scanner.getScanMetrics();
+    // Assert on overall scan metrics
+    long bytes = getBytesOfResults(results);
+    Map<String, Long> overallMetrics = scanMetrics.getMetricsMap(false);
+    assertEquals(3, (long) overallMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+    assertEquals(bytes, (long) 
overallMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+    // 1 Extra RPC call per region where no row is returned but 
moreResultsInRegion is set to false
+    assertEquals(6, (long) overallMetrics.get(RPC_CALLS_METRIC_NAME));
+    // Assert scan metrics by region were collected for the region scanned
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    assertEquals(3, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo smri = entry.getKey();
+      Map<String, Long> perRegionMetrics = entry.getValue();
+      assertNotNull(smri.getServerName());
+      assertNotNull(smri.getEncodedRegionName());
+      assertEquals(1, (long) 
perRegionMetrics.get(REGIONS_SCANNED_METRIC_NAME));
+      assertEquals(1, (long) 
perRegionMetrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+      bytes = getBytesOfResults(Collections.singletonList(results.get(0)));
+      assertEquals(bytes, (long) 
perRegionMetrics.get(BYTES_IN_RESULTS_METRIC_NAME));
+      assertEquals(2, (long) perRegionMetrics.get(RPC_CALLS_METRIC_NAME));
+    }
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
index e0e7187da91..78041f8b972 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientSideRegionScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -29,6 +30,7 @@ import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -37,6 +39,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.IndexOnlyLruBlockCache;
@@ -46,6 +50,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -179,6 +184,86 @@ public class TestClientSideRegionScanner {
     }
   }
 
+  @Test
+  public void testScanMetricsDisabled() throws IOException {
+    Configuration copyConf = new Configuration(conf);
+    Scan scan = new Scan();
+    try (ClientSideRegionScanner clientSideRegionScanner =
+      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, 
null)) {
+      clientSideRegionScanner.next();
+      assertNull(clientSideRegionScanner.getScanMetrics());
+    }
+  }
+
+  private void testScanMetricsWithScanMetricsByRegionDisabled(ScanMetrics 
scanMetrics)
+    throws IOException {
+    Configuration copyConf = new Configuration(conf);
+    Scan scan = new Scan();
+    scan.setScanMetricsEnabled(true);
+    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+    try (ClientSideRegionScanner clientSideRegionScanner =
+      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, 
scanMetrics)) {
+      clientSideRegionScanner.next();
+      ScanMetrics scanMetricsFromScanner = 
clientSideRegionScanner.getScanMetrics();
+      assertNotNull(scanMetricsFromScanner);
+      if (scanMetrics != null) {
+        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+      }
+      Map<String, Long> metricsMap = 
scanMetricsFromScanner.getMetricsMap(false);
+      Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) 
> 0);
+      
Assert.assertTrue(scanMetricsFromScanner.collectMetricsByRegion(false).isEmpty());
+    }
+  }
+
+  @Test
+  public void testScanMetricsNotAsInputWithScanMetricsByRegionDisabled() 
throws IOException {
+    testScanMetricsWithScanMetricsByRegionDisabled(null);
+  }
+
+  @Test
+  public void testScanMetricsAsInputWithScanMetricsByRegionDisabled() throws 
IOException {
+    testScanMetricsWithScanMetricsByRegionDisabled(new ScanMetrics());
+  }
+
+  private void testScanMetricByRegion(ScanMetrics scanMetrics) throws 
IOException {
+    Configuration copyConf = new Configuration(conf);
+    Scan scan = new Scan();
+    scan.setEnableScanMetricsByRegion(true);
+    TEST_UTIL.getAdmin().flush(TableName.META_TABLE_NAME);
+    try (ClientSideRegionScanner clientSideRegionScanner =
+      new ClientSideRegionScanner(copyConf, fs, rootDir, htd, hri, scan, 
scanMetrics)) {
+      clientSideRegionScanner.next();
+      ScanMetrics scanMetricsFromScanner = 
clientSideRegionScanner.getScanMetrics();
+      assertNotNull(scanMetricsFromScanner);
+      if (scanMetrics != null) {
+        Assert.assertSame(scanMetrics, scanMetricsFromScanner);
+      }
+      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+        scanMetricsFromScanner.collectMetricsByRegion();
+      Assert.assertEquals(1, scanMetricsByRegion.size());
+      for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+        .entrySet()) {
+        ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+        Map<String, Long> metricsMap = entry.getValue();
+        Assert.assertEquals(hri.getEncodedName(), 
scanMetricsRegionInfo.getEncodedRegionName());
+        Assert.assertNull(scanMetricsRegionInfo.getServerName());
+        
Assert.assertTrue(metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) > 0);
+        Assert.assertEquals((long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME),
+          scanMetricsFromScanner.countOfRowsScanned.get());
+      }
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithoutScanMetricsAsInput() throws 
IOException {
+    testScanMetricByRegion(null);
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithScanMetricsAsInput() throws 
IOException {
+    testScanMetricByRegion(new ScanMetrics());
+  }
+
   private static Put createPut(int rowAsInt) {
     byte[] row = Bytes.toBytes(rowAsInt);
     Put put = new Put(row);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 4c30c2edff0..5d372ac7b70 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -26,6 +28,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -42,8 +45,10 @@ import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -58,6 +63,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.zookeeper.KeeperException;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -94,6 +100,7 @@ public class TestReplicasClient {
   private static final byte[] f = HConstants.CATALOG_FAMILY;
 
   private final static int REFRESH_PERIOD = 1000;
+  private static ServerName rsServerName;
 
   /**
    * This copro is used to synchronize the tests.
@@ -232,6 +239,9 @@ public class TestReplicasClient {
     Configuration c = new Configuration(HTU.getConfiguration());
     c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     LOG.info("Master has stopped");
+
+    rsServerName = HTU.getHBaseCluster().getRegionServer(0).getServerName();
+    Assert.assertNotNull(rsServerName);
   }
 
   @AfterClass
@@ -777,4 +787,74 @@ public class TestReplicasClient {
       }
     }
   }
+
+  @Test
+  public void testScanMetricsByRegion() throws Exception {
+    byte[] b1 = Bytes.toBytes("testScanMetricsByRegion");
+    openRegion(hriSecondary);
+
+    try {
+      Put p = new Put(b1);
+      p.addColumn(f, b1, b1);
+      table.put(p);
+      LOG.info("Put done");
+      flushRegion(hriPrimary);
+
+      // Sleep for 2 * REFRESH_PERIOD so that flushed data is visible to 
secondary replica
+      Thread.sleep(2 * REFRESH_PERIOD);
+
+      // Explicitly read replica 0
+      Scan scan = new Scan();
+      scan.setEnableScanMetricsByRegion(true);
+      scan.withStartRow(b1, true);
+      scan.withStopRow(b1, true);
+      // Assert row was read from primary replica along with asserting scan 
metrics by region
+      assertScanMetrics(scan, hriPrimary, false);
+      LOG.info("Scanned primary replica");
+
+      // Read from region replica
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
+      scan = new Scan();
+      scan.setEnableScanMetricsByRegion(true);
+      scan.withStartRow(b1, true);
+      scan.withStopRow(b1, true);
+      scan.setConsistency(Consistency.TIMELINE);
+      // Assert row was read from secondary replica along with asserting no 
scan metrics by region
+      // for timeline consistency
+      assertScanMetrics(scan, null, true);
+      LOG.info("Scanned secondary replica ");
+    } finally {
+      SlowMeCopro.getPrimaryCdl().get().countDown();
+      Delete d = new Delete(b1);
+      table.delete(d);
+      closeRegion(hriSecondary);
+    }
+  }
+
+  private void assertScanMetrics(Scan scan, RegionInfo regionInfo, boolean 
isStale)
+    throws IOException {
+    try (ResultScanner rs = table.getScanner(scan);) {
+      for (Result r : rs) {
+        Assert.assertEquals(isStale, r.isStale());
+        Assert.assertFalse(r.isEmpty());
+      }
+      Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+        rs.getScanMetrics().collectMetricsByRegion(false);
+      if (isStale) {
+        Assert.assertTrue(scanMetricsByRegion.isEmpty());
+      } else {
+        Assert.assertEquals(1, scanMetricsByRegion.size());
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metrics = entry.getValue();
+          Assert.assertEquals(rsServerName, 
scanMetricsRegionInfo.getServerName());
+          Assert.assertEquals(regionInfo.getEncodedName(),
+            scanMetricsRegionInfo.getEncodedRegionName());
+          Assert.assertEquals(1, (long) 
metrics.get(REGIONS_SCANNED_METRIC_NAME));
+          Assert.assertEquals(1, (long) 
metrics.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+        }
+      }
+    }
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
new file mode 100644
index 00000000000..36935511734
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanAttributes.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestScanAttributes {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestScanAttributes.class);
+
+  @Test
+  public void testCoEnableAndCoDisableScanMetricsAndScanMetricsByRegion() {
+    Scan scan = new Scan();
+    Assert.assertFalse(scan.isScanMetricsEnabled());
+    Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+
+    // Assert enabling scan metrics by region enables scan metrics also
+    scan.setEnableScanMetricsByRegion(true);
+    Assert.assertTrue(scan.isScanMetricsEnabled());
+    Assert.assertTrue(scan.isScanMetricsByRegionEnabled());
+
+    // Assert disabling scan metrics disables scan metrics by region
+    scan.setScanMetricsEnabled(false);
+    Assert.assertFalse(scan.isScanMetricsEnabled());
+    Assert.assertFalse(scan.isScanMetricsByRegionEnabled());
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
new file mode 100644
index 00000000000..6f84ddcc314
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
@@ -0,0 +1,710 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.HConstants.EMPTY_BYTE_ARRAY;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.NOT_SERVING_REGION_EXCEPTION_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestTableScanMetrics extends FromClientSideBase {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestTableScanMetrics.class);
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+
+  private static final TableName TABLE_NAME =
+    TableName.valueOf(TestTableScanMetrics.class.getSimpleName());
+
+  private static final byte[] CF = Bytes.toBytes("cf");
+
+  private static final byte[] CQ = Bytes.toBytes("cq");
+
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  private static final Random RAND = new Random(11);
+
+  private static int NUM_REGIONS;
+
+  private static Connection CONN;
+
+  @Parameters(name = "{index}: scanner={0}")
+  public static List<Object[]> params() {
+    return Arrays.asList(new Object[] { "ForwardScanner", new Scan() },
+      new Object[] { "ReverseScanner", new Scan().setReversed(true) });
+  }
+
+  @Parameter(0)
+  public String scannerName;
+
+  @Parameter(1)
+  public Scan originalScan;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // Start the minicluster
+    TEST_UTIL.startMiniCluster(2);
+    // Create 3 rows in the table, with rowkeys starting with "xxx*", "yyy*" 
and "zzz*" so that
+    // scan hits all the region and not all rows lie in a single region
+    try (Table table = TEST_UTIL.createMultiRegionTable(TABLE_NAME, CF)) {
+      table.put(Arrays.asList(new Put(Bytes.toBytes("xxx1")).addColumn(CF, CQ, 
VALUE),
+        new Put(Bytes.toBytes("yyy1")).addColumn(CF, CQ, VALUE),
+        new Put(Bytes.toBytes("zzz1")).addColumn(CF, CQ, VALUE)));
+    }
+    CONN = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    NUM_REGIONS = TEST_UTIL.getHBaseCluster().getRegions(TABLE_NAME).size();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  private Scan generateScan(byte[] smallerRow, byte[] largerRow) throws 
IOException {
+    Scan scan = new Scan(originalScan);
+    if (originalScan.isReversed()) {
+      scan.withStartRow(largerRow, true);
+      scan.withStopRow(smallerRow, true);
+    } else {
+      scan.withStartRow(smallerRow, true);
+      scan.withStopRow(largerRow, true);
+    }
+    return scan;
+  }
+
+  private ScanMetrics assertScannedRowsAndGetScanMetrics(Scan scan, int 
expectedCount)
+    throws IOException {
+    int countOfRows = 0;
+    ScanMetrics scanMetrics;
+    try (Table table = CONN.getTable(TABLE_NAME); ResultScanner scanner = 
table.getScanner(scan)) {
+      for (Result result : scanner) {
+        Assert.assertFalse(result.isEmpty());
+        countOfRows++;
+      }
+      scanMetrics = scanner.getScanMetrics();
+    }
+    Assert.assertEquals(expectedCount, countOfRows);
+    return scanMetrics;
+  }
+
+  @Test
+  public void testScanMetricsDisabled() throws Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 3);
+    Assert.assertNull(scanMetrics);
+  }
+
+  @Test
+  public void testScanMetricsWithScanMetricByRegionDisabled() throws Exception 
{
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    scan.setScanMetricsEnabled(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+    // The test setup is such that we have 1 row per region in the scan range
+    Assert.assertEquals(expectedRowsScanned, scanMetrics.countOfRegions.get());
+    Assert.assertEquals(expectedRowsScanned,
+      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    Assert.assertTrue(scanMetrics.collectMetricsByRegion().isEmpty());
+  }
+
+  @Test
+  public void testScanMetricsResetWithScanMetricsByRegionDisabled() throws 
Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    scan.setScanMetricsEnabled(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    // By default counters are collected with reset as true
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+    Assert.assertEquals(expectedRowsScanned, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+    Assert.assertEquals(expectedRowsScanned,
+      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    // Subsequent call to get scan metrics map should show all counters as 0
+    Assert.assertEquals(0, scanMetrics.countOfRegions.get());
+    Assert.assertEquals(0, scanMetrics.countOfRowsScanned.get());
+  }
+
+  @Test
+  public void testScanMetricsByRegionForSingleRegionScan() throws Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("xxx1"));
+    scan.setEnableScanMetricsByRegion(true);
+    int expectedRowsScanned = 1;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap(false);
+    Assert.assertEquals(expectedRowsScanned, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+    Assert.assertEquals(expectedRowsScanned,
+      (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      // As we are scanning single row so, overall scan metrics will match per 
region scan metrics
+      Assert.assertEquals(expectedRowsScanned, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(expectedRowsScanned,
+        (long) metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionForMultiRegionScan() throws Exception {
+    Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+    scan.setEnableScanMetricsByRegion(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+    Assert.assertEquals(NUM_REGIONS, scanMetrics.countOfRegions.get());
+    Assert.assertEquals(expectedRowsScanned, 
scanMetrics.countOfRowsScanned.get());
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion(false);
+    Assert.assertEquals(NUM_REGIONS, scanMetricsByRegion.size());
+    int rowsScannedAcrossAllRegions = 0;
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      if (metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME) == 1) {
+        rowsScannedAcrossAllRegions++;
+      } else {
+        assertEquals(0, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+      }
+    }
+    Assert.assertEquals(expectedRowsScanned, rowsScannedAcrossAllRegions);
+  }
+
+  @Test
+  public void testScanMetricsByRegionReset() throws Exception {
+    Scan scan = generateScan(Bytes.toBytes("xxx1"), Bytes.toBytes("zzz1"));
+    scan.setEnableScanMetricsByRegion(true);
+    int expectedRowsScanned = 3;
+    ScanMetrics scanMetrics = assertScannedRowsAndGetScanMetrics(scan, 
expectedRowsScanned);
+    Assert.assertNotNull(scanMetrics);
+
+    // Retrieve scan metrics by region as a map and reset
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion();
+    // We scan 1 row per region
+    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(1, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+
+    // Scan metrics have already been reset and now all counters should be 0
+    scanMetricsByRegion = scanMetrics.collectMetricsByRegion(false);
+    // Size of map should be same as earlier
+    Assert.assertEquals(expectedRowsScanned, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+      // Counters should have been reset to 0
+      Assert.assertEquals(0, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(0, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+  }
+
+  @Test
+  public void testConcurrentUpdatesAndResetOfScanMetricsByRegion() throws 
Exception {
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(2);
+    TableName tableName = 
TableName.valueOf(TestTableScanMetrics.class.getSimpleName()
+      + "_testConcurrentUpdatesAndResetToScanMetricsByRegion");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      Map<ScanMetricsRegionInfo, Map<String, Long>> 
concurrentScanMetricsByRegion = new HashMap<>();
+
+      // Trigger two concurrent threads one of which scans the table and other 
periodically
+      // collects the scan metrics (along with resetting the counters to 0).
+      Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setCaching(2);
+      try (ResultScanner rs = table.getScanner(scan)) {
+        ScanMetrics scanMetrics = rs.getScanMetrics();
+        AtomicInteger rowsScanned = new AtomicInteger(0);
+        CountDownLatch latch = new CountDownLatch(1);
+        Runnable tableScanner = new Runnable() {
+          public void run() {
+            for (Result r : rs) {
+              Assert.assertFalse(r.isEmpty());
+              rowsScanned.incrementAndGet();
+            }
+            latch.countDown();
+          }
+        };
+        Runnable metricsCollector =
+          getPeriodicScanMetricsCollector(scanMetrics, 
concurrentScanMetricsByRegion, latch);
+        executor.execute(tableScanner);
+        executor.execute(metricsCollector);
+        latch.await();
+        // Merge leftover scan metrics
+        mergeScanMetricsByRegion(scanMetrics.collectMetricsByRegion(),
+          concurrentScanMetricsByRegion);
+        Assert.assertEquals(HBaseTestingUtility.ROWS.length, 
rowsScanned.get());
+      }
+
+      Map<ScanMetricsRegionInfo, Map<String, Long>> 
expectedScanMetricsByRegion;
+
+      // Collect scan metrics by region from single thread. Assert that 
concurrent scan
+      // and metrics collection works as expected.
+      scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setCaching(2);
+      try (ResultScanner rs = table.getScanner(scan)) {
+        ScanMetrics scanMetrics = rs.getScanMetrics();
+        int rowsScanned = 0;
+        for (Result r : rs) {
+          Assert.assertFalse(r.isEmpty());
+          rowsScanned++;
+        }
+        Assert.assertEquals(HBaseTestingUtility.ROWS.length, rowsScanned);
+        expectedScanMetricsByRegion = scanMetrics.collectMetricsByRegion();
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
expectedScanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          // Remove millis between nexts metric as it is not deterministic
+          metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+          Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+          Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+          // Each region will have 26 * 26 + 26 + 1 rows except last region 
which will have 1 row
+          long rowsScannedFromMetrics = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+          Assert.assertTrue(
+            rowsScannedFromMetrics == 1 || rowsScannedFromMetrics == (26 * 26 
+ 26 + 1));
+        }
+      }
+
+      // Assert on scan metrics by region
+      Assert.assertEquals(expectedScanMetricsByRegion, 
concurrentScanMetricsByRegion);
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithRegionMove() throws Exception {
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"testScanMetricsByRegionWithRegionMove");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      // Scan 2 regions with start row keys: bbb and ccc
+      byte[] bbb = Bytes.toBytes("bbb");
+      byte[] ccc = Bytes.toBytes("ccc");
+      byte[] ddc = Bytes.toBytes("ddc");
+      long expectedCountOfRowsScannedInMovedRegion = 0;
+      // ROWS is the data loaded by loadTable()
+      for (byte[] row : HBaseTestingUtility.ROWS) {
+        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccc) < 0) {
+          expectedCountOfRowsScannedInMovedRegion++;
+        }
+      }
+      byte[] movedRegion = null;
+      ScanMetrics scanMetrics;
+
+      // Initialize scan with maxResultSize as size of 50 rows.
+      Scan scan = generateScan(bbb, ddc);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setMaxResultSize(8000);
+
+      try (ResultScanner rs = table.getScanner(scan)) {
+        boolean isFirstScanOfRegion = true;
+        for (Result r : rs) {
+          byte[] row = r.getRow();
+          if (isFirstScanOfRegion) {
+            movedRegion = moveRegion(tableName, row);
+            isFirstScanOfRegion = false;
+          }
+        }
+        Assert.assertNotNull(movedRegion);
+
+        scanMetrics = rs.getScanMetrics();
+        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+          scanMetrics.collectMetricsByRegion();
+        long actualCountOfRowsScannedInMovedRegion = 0;
+        Set<ServerName> serversForMovedRegion = new HashSet<>();
+
+        // 2 regions scanned with two entries for first region as it moved in 
b/w scan
+        Assert.assertEquals(3, scanMetricsByRegion.size());
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          if 
(scanMetricsRegionInfo.getEncodedRegionName().equals(Bytes.toString(movedRegion)))
 {
+            long rowsScanned = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+            actualCountOfRowsScannedInMovedRegion += rowsScanned;
+            serversForMovedRegion.add(scanMetricsRegionInfo.getServerName());
+
+            Assert.assertTrue(metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1
+              || metricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME) == 
1);
+          }
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+        }
+        Assert.assertEquals(expectedCountOfRowsScannedInMovedRegion,
+          actualCountOfRowsScannedInMovedRegion);
+        Assert.assertEquals(2, serversForMovedRegion.size());
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithRegionSplit() throws Exception {
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"testScanMetricsByRegionWithRegionSplit");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      // Scan 1 region with start row key: bbb
+      byte[] bbb = Bytes.toBytes("bbb");
+      byte[] bmw = Bytes.toBytes("bmw");
+      byte[] ccb = Bytes.toBytes("ccb");
+      long expectedCountOfRowsScannedInRegion = 0;
+      // ROWS is the data loaded by loadTable()
+      for (byte[] row : HBaseTestingUtility.ROWS) {
+        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ccb) <= 0) {
+          expectedCountOfRowsScannedInRegion++;
+        }
+      }
+      ScanMetrics scanMetrics;
+      Set<String> expectedSplitRegionRes = new HashSet<>();
+
+      // Initialize scan
+      Scan scan = generateScan(bbb, ccb);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setMaxResultSize(8000);
+
+      try (ResultScanner rs = table.getScanner(scan)) {
+        boolean isFirstScanOfRegion = true;
+        for (Result r : rs) {
+          if (isFirstScanOfRegion) {
+            splitRegion(tableName, bbb, bmw)
+              .forEach(region -> 
expectedSplitRegionRes.add(Bytes.toString(region)));
+            isFirstScanOfRegion = false;
+          }
+        }
+
+        scanMetrics = rs.getScanMetrics();
+        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+          scanMetrics.collectMetricsByRegion();
+
+        long actualCountOfRowsScannedInRegion = 0;
+        long rpcRetiesCount = 0;
+        long notServingRegionExceptionCount = 0;
+        Set<String> splitRegionRes = new HashSet<>();
+
+        // 1 entry each for parent and two child regions
+        Assert.assertEquals(3, scanMetricsByRegion.size());
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          long rowsScanned = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+          actualCountOfRowsScannedInRegion += rowsScanned;
+          splitRegionRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+
+          if (metricsMap.get(RPC_RETRIES_METRIC_NAME) == 1) {
+            rpcRetiesCount++;
+          }
+          if (metricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME) == 1) {
+            notServingRegionExceptionCount++;
+          }
+
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+        }
+        Assert.assertEquals(expectedCountOfRowsScannedInRegion, 
actualCountOfRowsScannedInRegion);
+        Assert.assertEquals(1, rpcRetiesCount);
+        Assert.assertEquals(1, notServingRegionExceptionCount);
+        Assert.assertEquals(expectedSplitRegionRes, splitRegionRes);
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionWithRegionMerge() throws Exception {
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"testScanMetricsByRegionWithRegionMerge");
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+
+      // Scan 2 regions with start row keys: bbb and ccc
+      byte[] bbb = Bytes.toBytes("bbb");
+      byte[] ccc = Bytes.toBytes("ccc");
+      byte[] ddc = Bytes.toBytes("ddc");
+      long expectedCountOfRowsScannedInRegions = 0;
+      // ROWS is the data loaded by loadTable()
+      for (byte[] row : HBaseTestingUtility.ROWS) {
+        if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, ddc) <= 0) {
+          expectedCountOfRowsScannedInRegions++;
+        }
+      }
+      ScanMetrics scanMetrics;
+      Set<String> expectedMergeRegionsRes = new HashSet<>();
+      String mergedRegionEncodedName = null;
+
+      // Initialize scan
+      Scan scan = generateScan(bbb, ddc);
+      scan.setEnableScanMetricsByRegion(true);
+      scan.setMaxResultSize(8000);
+
+      try (ResultScanner rs = table.getScanner(scan)) {
+        boolean isFirstScanOfRegion = true;
+        for (Result r : rs) {
+          if (isFirstScanOfRegion) {
+            List<byte[]> out = mergeRegions(tableName, bbb, ccc);
+            // Entry with index 2 is the encoded region name of merged region
+            mergedRegionEncodedName = Bytes.toString(out.get(2));
+            out.forEach(region -> 
expectedMergeRegionsRes.add(Bytes.toString(region)));
+            isFirstScanOfRegion = false;
+          }
+        }
+
+        scanMetrics = rs.getScanMetrics();
+        Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+          scanMetrics.collectMetricsByRegion();
+        long actualCountOfRowsScannedInRegions = 0;
+        Set<String> mergeRegionsRes = new HashSet<>();
+        boolean containsMergedRegionInScanMetrics = false;
+
+        // 1 entry each for old region from which first row was scanned and 
new merged region
+        Assert.assertEquals(2, scanMetricsByRegion.size());
+        for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+          .entrySet()) {
+          ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+          Map<String, Long> metricsMap = entry.getValue();
+          long rowsScanned = 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME);
+          actualCountOfRowsScannedInRegions += rowsScanned;
+          mergeRegionsRes.add(scanMetricsRegionInfo.getEncodedRegionName());
+          if 
(scanMetricsRegionInfo.getEncodedRegionName().equals(mergedRegionEncodedName)) {
+            containsMergedRegionInScanMetrics = true;
+            Assert.assertEquals(1, (long) 
metricsMap.get(RPC_RETRIES_METRIC_NAME));
+          } else {
+            Assert.assertEquals(1, (long) 
metricsMap.get(NOT_SERVING_REGION_EXCEPTION_METRIC_NAME));
+          }
+          Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+        }
+        Assert.assertEquals(expectedCountOfRowsScannedInRegions, 
actualCountOfRowsScannedInRegions);
+        
Assert.assertTrue(expectedMergeRegionsRes.containsAll(mergeRegionsRes));
+        Assert.assertTrue(containsMergedRegionInScanMetrics);
+      }
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
+  private Runnable getPeriodicScanMetricsCollector(ScanMetrics scanMetrics,
+    Map<ScanMetricsRegionInfo, Map<String, Long>> 
scanMetricsByRegionCollection,
+    CountDownLatch latch) {
+    return new Runnable() {
+      public void run() {
+        try {
+          while (latch.getCount() > 0) {
+            Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+              scanMetrics.collectMetricsByRegion();
+            mergeScanMetricsByRegion(scanMetricsByRegion, 
scanMetricsByRegionCollection);
+            Thread.sleep(RAND.nextInt(10));
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private void mergeScanMetricsByRegion(Map<ScanMetricsRegionInfo, Map<String, 
Long>> srcMap,
+    Map<ScanMetricsRegionInfo, Map<String, Long>> dstMap) {
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
srcMap.entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      // Remove millis between nexts metric as it is not deterministic
+      metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+      if (dstMap.containsKey(scanMetricsRegionInfo)) {
+        Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
+        for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {
+          String metricName = metricEntry.getKey();
+          Long existingValue = dstMetricsMap.get(metricName);
+          Long newValue = metricEntry.getValue();
+          dstMetricsMap.put(metricName, existingValue + newValue);
+        }
+      } else {
+        dstMap.put(scanMetricsRegionInfo, metricsMap);
+      }
+    }
+  }
+
+  /**
+   * Moves the region with start row key from its original region server to 
some other region
+   * server. This is a synchronous method.
+   * @param tableName Table name of region to be moved belongs.
+   * @param startRow  Start row key of the region to be moved.
+   * @return Encoded region name of the region which was moved.
+   */
+  private byte[] moveRegion(TableName tableName, byte[] startRow) throws 
IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+    HRegionLocation loc = regionLocator.getRegionLocation(startRow, true);
+    byte[] encodedRegionName = loc.getRegion().getEncodedNameAsBytes();
+    ServerName initialServerName = loc.getServerName();
+
+    admin.move(encodedRegionName);
+
+    ServerName finalServerName = regionLocator.getRegionLocation(startRow, 
true).getServerName();
+
+    // Assert that region actually moved
+    Assert.assertNotEquals(initialServerName, finalServerName);
+    return encodedRegionName;
+  }
+
+  /**
+   * Splits the region with start row key at the split key provided. This is a 
synchronous method.
+   * @param tableName Table name of region to be split.
+   * @param startRow  Start row key of the region to be split.
+   * @param splitKey  Split key for splitting the region.
+   * @return List of encoded region names with first element being parent 
region followed by two
+   *         child regions.
+   */
+  private List<byte[]> splitRegion(TableName tableName, byte[] startRow, 
byte[] splitKey)
+    throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+    HRegionLocation topLoc = regionLocator.getRegionLocation(startRow, true);
+    byte[] initialEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    ServerName initialTopServerName = topLoc.getServerName();
+    HRegionLocation bottomLoc = regionLocator.getRegionLocation(splitKey, 
true);
+    byte[] initialEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+    ServerName initialBottomServerName = bottomLoc.getServerName();
+
+    // Assert region is ready for split
+    Assert.assertEquals(initialTopServerName, initialBottomServerName);
+    Assert.assertTrue(Bytes.equals(initialEncodedTopRegionName, 
initialEncodedBottomRegionName));
+
+    FutureUtils.get(admin.splitRegionAsync(initialEncodedTopRegionName, 
splitKey));
+
+    topLoc = regionLocator.getRegionLocation(startRow, true);
+    byte[] finalEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    bottomLoc = regionLocator.getRegionLocation(splitKey, true);
+    byte[] finalEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+    // Assert that region split is complete
+    Assert.assertFalse(Bytes.equals(finalEncodedTopRegionName, 
finalEncodedBottomRegionName));
+    Assert.assertFalse(Bytes.equals(initialEncodedTopRegionName, 
finalEncodedBottomRegionName));
+    Assert.assertFalse(Bytes.equals(initialEncodedBottomRegionName, 
finalEncodedTopRegionName));
+
+    return Arrays.asList(initialEncodedTopRegionName, 
finalEncodedTopRegionName,
+      finalEncodedBottomRegionName);
+  }
+
+  /**
+   * Merges two regions with the start row key as topRegion and bottomRegion. 
Ensures that the
+   * regions to be merged are adjacent regions. This is a synchronous method.
+   * @param tableName    Table name of regions to be merged.
+   * @param topRegion    Start row key of first region for merging.
+   * @param bottomRegion Start row key of second region for merging.
+   * @return List of encoded region names with first two elements being 
original regions followed by
+   *         the merged region.
+   */
+  private List<byte[]> mergeRegions(TableName tableName, byte[] topRegion, 
byte[] bottomRegion)
+    throws IOException {
+    Admin admin = TEST_UTIL.getAdmin();
+    RegionLocator regionLocator = CONN.getRegionLocator(tableName);
+    HRegionLocation topLoc = regionLocator.getRegionLocation(topRegion, true);
+    byte[] initialEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    String initialTopRegionEndKey = 
Bytes.toString(topLoc.getRegion().getEndKey());
+    HRegionLocation bottomLoc = regionLocator.getRegionLocation(bottomRegion, 
true);
+    byte[] initialEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+    String initialBottomRegionStartKey = 
Bytes.toString(bottomLoc.getRegion().getStartKey());
+
+    // Assert that regions are ready to be merged
+    Assert.assertFalse(Bytes.equals(initialEncodedTopRegionName, 
initialEncodedBottomRegionName));
+    Assert.assertEquals(initialBottomRegionStartKey, initialTopRegionEndKey);
+
+    FutureUtils.get(admin.mergeRegionsAsync(
+      new byte[][] { initialEncodedTopRegionName, 
initialEncodedBottomRegionName }, false));
+
+    topLoc = regionLocator.getRegionLocation(topRegion, true);
+    byte[] finalEncodedTopRegionName = 
topLoc.getRegion().getEncodedNameAsBytes();
+    bottomLoc = regionLocator.getRegionLocation(bottomRegion, true);
+    byte[] finalEncodedBottomRegionName = 
bottomLoc.getRegion().getEncodedNameAsBytes();
+
+    // Assert regions have been merges successfully
+    Assert.assertTrue(Bytes.equals(finalEncodedTopRegionName, 
finalEncodedBottomRegionName));
+    Assert.assertFalse(Bytes.equals(initialEncodedTopRegionName, 
finalEncodedTopRegionName));
+    Assert.assertFalse(Bytes.equals(initialEncodedBottomRegionName, 
finalEncodedTopRegionName));
+
+    return Arrays.asList(initialEncodedTopRegionName, 
initialEncodedBottomRegionName,
+      finalEncodedTopRegionName);
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
index 93e4a230a22..8cc568c130b 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java
@@ -17,10 +17,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -33,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
 import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -246,6 +252,84 @@ public class TestTableSnapshotScanner {
     testScanner(UTIL, "testWithMultiRegion", 20, true);
   }
 
+  private ScanMetrics createTableSnapshotScannerAndGetScanMetrics(boolean 
enableScanMetrics,
+    boolean enableScanMetricsByRegion, byte[] endKey) throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName() + "_TABLE");
+    String snapshotName = name.getMethodName() + "_SNAPSHOT";
+    try {
+      createTableAndSnapshot(UTIL, tableName, snapshotName, 50);
+      Path restoreDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+      Scan scan = new Scan().withStartRow(bbb).withStopRow(endKey);
+      scan.setScanMetricsEnabled(enableScanMetrics);
+      scan.setEnableScanMetricsByRegion(enableScanMetricsByRegion);
+      Configuration conf = UTIL.getConfiguration();
+
+      TableSnapshotScanner snapshotScanner =
+        new TableSnapshotScanner(conf, restoreDir, snapshotName, scan);
+      verifyScanner(snapshotScanner, bbb, endKey);
+      return snapshotScanner.getScanMetrics();
+    } finally {
+      UTIL.getAdmin().deleteSnapshot(snapshotName);
+      UTIL.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testScanMetricsDisabled() throws Exception {
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(false, false, yyy);
+    Assert.assertNull(scanMetrics);
+  }
+
+  @Test
+  public void testScanMetricsWithScanMetricsByRegionDisabled() throws 
Exception {
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(true, false, yyy);
+    Assert.assertNotNull(scanMetrics);
+    int rowsScanned = 0;
+    for (byte[] row : HBaseTestingUtility.ROWS) {
+      if (Bytes.compareTo(row, bbb) >= 0 && Bytes.compareTo(row, yyy) < 0) {
+        rowsScanned++;
+      }
+    }
+    Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+    Assert.assertEquals(rowsScanned, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+  }
+
+  @Test
+  public void testScanMetricsByRegionForSingleRegion() throws Exception {
+    // Scan single row with row key bbb
+    byte[] bbc = Bytes.toBytes("bbc");
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(true, true, bbc);
+    Assert.assertNotNull(scanMetrics);
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion();
+    Assert.assertEquals(1, scanMetricsByRegion.size());
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+      Assert.assertEquals(1, (long) 
metricsMap.get(COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME));
+    }
+  }
+
+  @Test
+  public void testScanMetricsByRegionForMultiRegion() throws Exception {
+    ScanMetrics scanMetrics = 
createTableSnapshotScannerAndGetScanMetrics(true, true, yyy);
+    Assert.assertNotNull(scanMetrics);
+    Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
+      scanMetrics.collectMetricsByRegion();
+    for (Map.Entry<ScanMetricsRegionInfo, Map<String, Long>> entry : 
scanMetricsByRegion
+      .entrySet()) {
+      ScanMetricsRegionInfo scanMetricsRegionInfo = entry.getKey();
+      Map<String, Long> metricsMap = entry.getValue();
+      Assert.assertNull(scanMetricsRegionInfo.getServerName());
+      Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
+      Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
+    }
+  }
+
   @Test
   public void testScannerWithRestoreScanner() throws Exception {
     TableName tableName = TableName.valueOf("testScanner");

Reply via email to