This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new f84c7d9de99 HBASE-29494: Capture Scan RPC processing time and queuing 
time in Scan Metrics (#7279) (#7242)
f84c7d9de99 is described below

commit f84c7d9de9975b6c4e24ab42fad7b8c3e92b7611
Author: sanjeet006py <[email protected]>
AuthorDate: Sat Sep 6 03:24:02 2025 +0530

    HBASE-29494: Capture Scan RPC processing time and queuing time in Scan 
Metrics (#7279) (#7242)
    
    Signed-off-by: Viraj Jasani <[email protected]>
    Signed-off-by: Hari Krishna Dara <[email protected]>
---
 .../client/metrics/ServerSideScanMetrics.java      | 10 ++++
 .../hadoop/hbase/regionserver/RSRpcServices.java   | 51 +++++++++-------
 .../hadoop/hbase/regionserver/ScannerContext.java  | 16 +++++-
 .../hadoop/hbase/client/TestTableScanMetrics.java  | 67 +++++++++++++++++++++-
 4 files changed, 119 insertions(+), 25 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index f8f5cb1e0bb..8f8b3c783b5 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -53,6 +53,8 @@ public class ServerSideScanMetrics {
     
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_BLOCK_CACHE_METRIC_NAME);
     
currentRegionScanMetricsData.createCounter(BYTES_READ_FROM_MEMSTORE_METRIC_NAME);
     
currentRegionScanMetricsData.createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+    
currentRegionScanMetricsData.createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
   }
 
   /**
@@ -71,6 +73,8 @@ public class ServerSideScanMetrics {
     "BYTES_READ_FROM_BLOCK_CACHE";
   public static final String BYTES_READ_FROM_MEMSTORE_METRIC_NAME = 
"BYTES_READ_FROM_MEMSTORE";
   public static final String BLOCK_READ_OPS_COUNT_METRIC_NAME = 
"BLOCK_READ_OPS_COUNT";
+  public static final String RPC_SCAN_PROCESSING_TIME_METRIC_NAME = 
"RPC_SCAN_PROCESSING_TIME";
+  public static final String RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME = 
"RPC_SCAN_QUEUE_WAIT_TIME";
 
   /**
    * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0
@@ -110,6 +114,12 @@ public class ServerSideScanMetrics {
 
   public final AtomicLong blockReadOpsCount = 
createCounter(BLOCK_READ_OPS_COUNT_METRIC_NAME);
 
+  public final AtomicLong rpcScanProcessingTime =
+    createCounter(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+
+  public final AtomicLong rpcScanQueueWaitTime =
+    createCounter(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
+
   /**
    * Sets counter with counterName to passed in value, does nothing if counter 
does not exist. If
    * region level scan metrics are enabled then sets the value of counter for 
the current region
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 013a3f3b15f..54ffbc043ce 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.VersionInfoUtil;
+import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
@@ -3323,8 +3324,8 @@ public class RSRpcServices
   // return whether we have more results in region.
   private void scan(HBaseRpcController controller, ScanRequest request, 
RegionScannerHolder rsh,
     long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> 
results,
-    ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall 
rpcCall)
-    throws IOException {
+    ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall 
rpcCall,
+    ServerSideScanMetrics scanMetrics) throws IOException {
     HRegion region = rsh.r;
     RegionScanner scanner = rsh.s;
     long maxResultSize;
@@ -3377,8 +3378,6 @@ public class RSRpcServices
         final LimitScope timeScope =
           allowHeartbeatMessages ? LimitScope.BETWEEN_CELLS : 
LimitScope.BETWEEN_ROWS;
 
-        boolean trackMetrics = request.hasTrackScanMetrics() && 
request.getTrackScanMetrics();
-
         // Configure with limits for this RPC. Set keep progress true since 
size progress
         // towards size limit should be kept between calls to nextRaw
         ScannerContext.Builder contextBuilder = 
ScannerContext.newBuilder(true);
@@ -3387,7 +3386,8 @@ public class RSRpcServices
         contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
         contextBuilder.setBatchLimit(scanner.getBatch());
         contextBuilder.setTimeLimit(timeScope, timeLimit);
-        contextBuilder.setTrackMetrics(trackMetrics);
+        contextBuilder.setTrackMetrics(scanMetrics != null);
+        contextBuilder.setScanMetrics(scanMetrics);
         ScannerContext scannerContext = contextBuilder.build();
         boolean limitReached = false;
         while (numOfResults < maxResults) {
@@ -3487,21 +3487,6 @@ public class RSRpcServices
           values.clear();
         }
         builder.setMoreResultsInRegion(moreRows);
-        // Check to see if the client requested that we track metrics server 
side. If the
-        // client requested metrics, retrieve the metrics from the scanner 
context.
-        if (trackMetrics) {
-          Map<String, Long> metrics = 
scannerContext.getMetrics().getMetricsMap();
-          ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
-          NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
-
-          for (Entry<String, Long> entry : metrics.entrySet()) {
-            pairBuilder.setName(entry.getKey());
-            pairBuilder.setValue(entry.getValue());
-            metricBuilder.addMetrics(pairBuilder.build());
-          }
-
-          builder.setScanMetrics(metricBuilder.build());
-        }
       }
     } finally {
       region.closeRegionOperation();
@@ -3651,6 +3636,8 @@ public class RSRpcServices
     boolean scannerClosed = false;
     try {
       List<Result> results = new ArrayList<>(Math.min(rows, 512));
+      boolean trackMetrics = request.hasTrackScanMetrics() && 
request.getTrackScanMetrics();
+      ServerSideScanMetrics scanMetrics = trackMetrics ? new 
ServerSideScanMetrics() : null;
       if (rows > 0) {
         boolean done = false;
         // Call coprocessor. Get region info from scanner.
@@ -3667,7 +3654,7 @@ public class RSRpcServices
         }
         if (!done) {
           scan((HBaseRpcController) controller, request, rsh, 
maxQuotaResultSize, rows, limitOfRows,
-            results, builder, lastBlock, rpcCall);
+            results, builder, lastBlock, rpcCall, scanMetrics);
         } else {
           builder.setMoreResultsInRegion(!results.isEmpty());
         }
@@ -3719,6 +3706,28 @@ public class RSRpcServices
         throw new TimeoutIOException("Client deadline exceeded, cannot return 
results");
       }
 
+      if (scanMetrics != null) {
+        if (rpcCall != null) {
+          long rpcScanTime = EnvironmentEdgeManager.currentTime() - 
rpcCall.getStartTime();
+          long rpcQueueWaitTime = rpcCall.getStartTime() - 
rpcCall.getReceiveTime();
+          
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME,
+            rpcScanTime);
+          
scanMetrics.addToCounter(ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME,
+            rpcQueueWaitTime);
+        }
+        Map<String, Long> metrics = scanMetrics.getMetricsMap();
+        ScanMetrics.Builder metricBuilder = ScanMetrics.newBuilder();
+        NameInt64Pair.Builder pairBuilder = NameInt64Pair.newBuilder();
+
+        for (Entry<String, Long> entry : metrics.entrySet()) {
+          pairBuilder.setName(entry.getKey());
+          pairBuilder.setValue(entry.getValue());
+          metricBuilder.addMetrics(pairBuilder.build());
+        }
+
+        builder.setScanMetrics(metricBuilder.build());
+      }
+
       return builder.build();
     } catch (IOException e) {
       try {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index ddebe83331d..2fb6ba5afeb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -117,6 +117,11 @@ public class ScannerContext {
   final ServerSideScanMetrics metrics;
 
   ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean 
trackMetrics) {
+    this(keepProgress, limitsToCopy, trackMetrics, null);
+  }
+
+  ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean 
trackMetrics,
+    ServerSideScanMetrics scanMetrics) {
     this.limits = new LimitFields();
     if (limitsToCopy != null) {
       this.limits.copy(limitsToCopy);
@@ -127,7 +132,8 @@ public class ScannerContext {
 
     this.keepProgress = keepProgress;
     this.scannerState = DEFAULT_STATE;
-    this.metrics = trackMetrics ? new ServerSideScanMetrics() : null;
+    this.metrics =
+      trackMetrics ? (scanMetrics != null ? scanMetrics : new 
ServerSideScanMetrics()) : null;
   }
 
   public boolean isTrackingMetrics() {
@@ -396,6 +402,7 @@ public class ScannerContext {
     boolean keepProgress = DEFAULT_KEEP_PROGRESS;
     boolean trackMetrics = false;
     LimitFields limits = new LimitFields();
+    ServerSideScanMetrics scanMetrics = null;
 
     private Builder() {
     }
@@ -432,8 +439,13 @@ public class ScannerContext {
       return this;
     }
 
+    public Builder setScanMetrics(ServerSideScanMetrics scanMetrics) {
+      this.scanMetrics = scanMetrics;
+      return this;
+    }
+
     public ScannerContext build() {
-      return new ScannerContext(keepProgress, limits, trackMetrics);
+      return new ScannerContext(keepProgress, limits, trackMetrics, 
scanMetrics);
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
index 6f84ddcc314..ac3b98a6bf7 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableScanMetrics.java
@@ -23,6 +23,8 @@ import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.NOT_SERVING_REG
 import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.REGIONS_SCANNED_METRIC_NAME;
 import static 
org.apache.hadoop.hbase.client.metrics.ScanMetrics.RPC_RETRIES_METRIC_NAME;
 import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_PROCESSING_TIME_METRIC_NAME;
+import static 
org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics.RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
@@ -36,16 +38,20 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.junit.AfterClass;
@@ -57,7 +63,7 @@ import org.junit.experimental.categories.Category;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
 
-@Category({ ClientTests.class, MediumTests.class })
+@Category({ ClientTests.class, LargeTests.class })
 public class TestTableScanMetrics extends FromClientSideBase {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
@@ -334,6 +340,8 @@ public class TestTableScanMetrics extends 
FromClientSideBase {
           Map<String, Long> metricsMap = entry.getValue();
           // Remove millis between nexts metric as it is not deterministic
           metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+          metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+          metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
           Assert.assertNotNull(scanMetricsRegionInfo.getEncodedRegionName());
           Assert.assertNotNull(scanMetricsRegionInfo.getServerName());
           Assert.assertEquals(1, (long) 
metricsMap.get(REGIONS_SCANNED_METRIC_NAME));
@@ -351,6 +359,59 @@ public class TestTableScanMetrics extends 
FromClientSideBase {
     }
   }
 
+  @Test
+  public void testRPCCallProcessingAndQueueWaitTimeMetrics() throws Exception {
+    final int numThreads = 20;
+    Configuration conf = TEST_UTIL.getConfiguration();
+    // Handler count is 3 by default.
+    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+      HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+    // Keep the number of threads to be high enough for RPC calls to queue up. 
For now going with 6
+    // times the handler count.
+    Assert.assertTrue(numThreads > 6 * handlerCount);
+    ThreadPoolExecutor executor = (ThreadPoolExecutor) 
Executors.newFixedThreadPool(numThreads);
+    TableName tableName = TableName.valueOf(
+      TestTableScanMetrics.class.getSimpleName() + 
"_testRPCCallProcessingAndQueueWaitTimeMetrics");
+    AtomicLong totalScanRpcTime = new AtomicLong(0);
+    AtomicLong totalQueueWaitTime = new AtomicLong(0);
+    CountDownLatch latch = new CountDownLatch(numThreads);
+    try (Table table = TEST_UTIL.createMultiRegionTable(tableName, CF)) {
+      TEST_UTIL.loadTable(table, CF);
+      for (int i = 0; i < numThreads; i++) {
+        executor.execute(new Runnable() {
+          @Override
+          public void run() {
+            try {
+              Scan scan = generateScan(EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY);
+              scan.setEnableScanMetricsByRegion(true);
+              scan.setCaching(2);
+              try (ResultScanner rs = table.getScanner(scan)) {
+                Result r;
+                while ((r = rs.next()) != null) {
+                  Assert.assertFalse(r.isEmpty());
+                }
+                ScanMetrics scanMetrics = rs.getScanMetrics();
+                Map<String, Long> metricsMap = scanMetrics.getMetricsMap();
+                
totalScanRpcTime.addAndGet(metricsMap.get(RPC_SCAN_PROCESSING_TIME_METRIC_NAME));
+                
totalQueueWaitTime.addAndGet(metricsMap.get(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME));
+              }
+              latch.countDown();
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+      }
+      latch.await();
+      executor.shutdown();
+      executor.awaitTermination(10, TimeUnit.SECONDS);
+      Assert.assertTrue(totalScanRpcTime.get() > 0);
+      Assert.assertTrue(totalQueueWaitTime.get() > 0);
+    } finally {
+      TEST_UTIL.deleteTable(tableName);
+    }
+  }
+
   @Test
   public void testScanMetricsByRegionWithRegionMove() throws Exception {
     TableName tableName = TableName.valueOf(
@@ -591,6 +652,8 @@ public class TestTableScanMetrics extends 
FromClientSideBase {
       Map<String, Long> metricsMap = entry.getValue();
       // Remove millis between nexts metric as it is not deterministic
       metricsMap.remove(MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+      metricsMap.remove(RPC_SCAN_PROCESSING_TIME_METRIC_NAME);
+      metricsMap.remove(RPC_SCAN_QUEUE_WAIT_TIME_METRIC_NAME);
       if (dstMap.containsKey(scanMetricsRegionInfo)) {
         Map<String, Long> dstMetricsMap = dstMap.get(scanMetricsRegionInfo);
         for (Map.Entry<String, Long> metricEntry : metricsMap.entrySet()) {

Reply via email to