This is an automated email from the ASF dual-hosted git repository.

cconnell pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new ef29573fb54 HBASE-29385: Improve performance of 
AggregrateImplementation quota checks (#7083)
ef29573fb54 is described below

commit ef29573fb54f2131e64b034e361da08876e49b39
Author: Charles Connell <[email protected]>
AuthorDate: Wed Jun 18 08:17:56 2025 -0400

    HBASE-29385: Improve performance of AggregrateImplementation quota checks 
(#7083)
    
    Signed-off-by: Ray Mattingly <[email protected]>
---
 .../hbase/coprocessor/AggregateImplementation.java | 92 ++++++++++++++--------
 .../coprocessor/TestAggregateImplementation.java   | 23 +++++-
 2 files changed, 83 insertions(+), 32 deletions(-)

diff --git 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 0a107068a43..389e6c013ba 100644
--- 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++ 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -29,14 +29,15 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.function.Function;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.ClientUtil;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
@@ -82,13 +83,14 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     RpcCallback<AggregateResponse> done) {
     InternalScanner scanner = null;
     AggregateResponse response = null;
-    PartialResultContext partialResultContext = new PartialResultContext();
+    PartialResultContext partialResultContext = null;
     T max = null;
     boolean hasMoreRows = true;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
+      partialResultContext = newPartialResultContext(scan);
       scanner = env.getRegion().getScanner(scan);
       List<Cell> results = new ArrayList<>();
       byte[] colFamily = scan.getFamilies()[0];
@@ -113,6 +115,11 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       } while (hasMoreRows);
       response = singlePartResponse(request, hasMoreRows, 
partialResultContext, max,
         ci::getProtoForCellType);
+      if (log.isDebugEnabled()) {
+        log.debug("Maximum from this region is {}: {} (partial result: {}) 
(client {})",
+          env.getRegion().getRegionInfo().getRegionNameAsString(), max, 
hasMoreRows,
+          RpcServer.getRequestUser());
+      }
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -121,9 +128,6 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       }
       closeQuota(partialResultContext);
     }
-    log.debug("Maximum from this region is {}: {} (partial result: {}) (client 
{})",
-      env.getRegion().getRegionInfo().getRegionNameAsString(), max, 
hasMoreRows,
-      RpcServer.getRequestUser());
     done.run(response);
   }
 
@@ -138,13 +142,14 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     RpcCallback<AggregateResponse> done) {
     AggregateResponse response = null;
     InternalScanner scanner = null;
-    PartialResultContext partialResultContext = new PartialResultContext();
+    PartialResultContext partialResultContext = null;
     T min = null;
     boolean hasMoreRows = true;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
+      partialResultContext = newPartialResultContext(scan);
       scanner = env.getRegion().getScanner(scan);
       List<Cell> results = new ArrayList<>();
       byte[] colFamily = scan.getFamilies()[0];
@@ -168,6 +173,11 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       } while (hasMoreRows);
       response = singlePartResponse(request, hasMoreRows, 
partialResultContext, min,
         ci::getProtoForCellType);
+      if (log.isDebugEnabled()) {
+        log.debug("Minimum from this region is {}: {} (partial result: {}) 
(client {})",
+          env.getRegion().getRegionInfo().getRegionNameAsString(), min, 
hasMoreRows,
+          RpcServer.getRequestUser());
+      }
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -176,9 +186,6 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       }
       closeQuota(partialResultContext);
     }
-    log.debug("Minimum from this region is {}: {} (partial result: {}) (client 
{})",
-      env.getRegion().getRegionInfo().getRegionNameAsString(), min, 
hasMoreRows,
-      RpcServer.getRequestUser());
     done.run(response);
   }
 
@@ -193,7 +200,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     RpcCallback<AggregateResponse> done) {
     AggregateResponse response = null;
     InternalScanner scanner = null;
-    PartialResultContext partialResultContext = new PartialResultContext();
+    PartialResultContext partialResultContext = null;
     long sum = 0L;
     boolean hasMoreRows = true;
     try {
@@ -201,6 +208,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       S sumVal = null;
       T temp;
       Scan scan = ProtobufUtil.toScan(request.getScan());
+      partialResultContext = newPartialResultContext(scan);
       scanner = env.getRegion().getScanner(scan);
       byte[] colFamily = scan.getFamilies()[0];
       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -226,6 +234,11 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       } while (hasMoreRows);
       response = singlePartResponse(request, hasMoreRows, 
partialResultContext, sumVal,
         ci::getProtoForPromotedType);
+      if (log.isDebugEnabled()) {
+        log.debug("Sum from this region is {}: {} (partial result: {}) (client 
{})",
+          env.getRegion().getRegionInfo().getRegionNameAsString(), sum, 
hasMoreRows,
+          RpcServer.getRequestUser());
+      }
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -234,9 +247,6 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       }
       closeQuota(partialResultContext);
     }
-    log.debug("Sum from this region is {}: {} (partial result: {}) (client 
{})",
-      env.getRegion().getRegionInfo().getRegionNameAsString(), sum, 
hasMoreRows,
-      RpcServer.getRequestUser());
     done.run(response);
   }
 
@@ -251,10 +261,11 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     long counter = 0L;
     List<Cell> results = new ArrayList<>();
     InternalScanner scanner = null;
-    PartialResultContext partialResultContext = new PartialResultContext();
+    PartialResultContext partialResultContext = null;
     boolean hasMoreRows = true;
     try {
       Scan scan = ProtobufUtil.toScan(request.getScan());
+      partialResultContext = newPartialResultContext(scan);
       byte[][] colFamilies = scan.getFamilies();
       byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
       NavigableSet<byte[]> qualifiers =
@@ -282,6 +293,11 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       bb.rewind();
       response = responseBuilder(request, hasMoreRows, partialResultContext)
         .addFirstPart(ByteString.copyFrom(bb)).build();
+      if (log.isDebugEnabled()) {
+        log.debug("Row counter from this region is {}: {} (partial result: {}) 
(client {})",
+          env.getRegion().getRegionInfo().getRegionNameAsString(), counter, 
hasMoreRows,
+          RpcServer.getRequestUser());
+      }
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -290,9 +306,6 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       }
       closeQuota(partialResultContext);
     }
-    log.debug("Row counter from this region is {}: {} (partial result: {}) 
(client {})",
-      env.getRegion().getRegionInfo().getRegionNameAsString(), counter, 
hasMoreRows,
-      RpcServer.getRequestUser());
     done.run(response);
   }
 
@@ -311,12 +324,13 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     RpcCallback<AggregateResponse> done) {
     AggregateResponse response = null;
     InternalScanner scanner = null;
-    PartialResultContext partialResultContext = new PartialResultContext();
+    PartialResultContext partialResultContext = null;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       S sumVal = null;
       Long rowCountVal = 0L;
       Scan scan = ProtobufUtil.toScan(request.getScan());
+      partialResultContext = newPartialResultContext(scan);
       scanner = env.getRegion().getScanner(scan);
       byte[] colFamily = scan.getFamilies()[0];
       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -386,12 +400,13 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     RpcCallback<AggregateResponse> done) {
     InternalScanner scanner = null;
     AggregateResponse response = null;
-    PartialResultContext partialResultContext = new PartialResultContext();
+    PartialResultContext partialResultContext = null;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       S sumVal = null, sumSqVal = null, tempVal = null;
       long rowCountVal = 0L;
       Scan scan = ProtobufUtil.toScan(request.getScan());
+      partialResultContext = newPartialResultContext(scan);
       scanner = env.getRegion().getScanner(scan);
       byte[] colFamily = scan.getFamilies()[0];
       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -467,11 +482,12 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     RpcCallback<AggregateResponse> done) {
     AggregateResponse response = null;
     InternalScanner scanner = null;
-    PartialResultContext partialResultContext = new PartialResultContext();
+    PartialResultContext partialResultContext = null;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
       Scan scan = ProtobufUtil.toScan(request.getScan());
+      partialResultContext = newPartialResultContext(scan);
       scanner = env.getRegion().getScanner(scan);
       byte[] colFamily = scan.getFamilies()[0];
       NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -537,18 +553,34 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
   }
 
   private final static class PartialResultContext {
+    private long rowsRead = 0;
+    private final int quotaCheckInterval;
     private OperationQuota quota = null;
     private long waitIntervalMs = 0;
-    private byte[] lastRowSuccessfullyProcessedArray = null;
-    private int lastRowSuccessfullyProcessedOffset = 0;
-    private int lastRowSuccessfullyProcessedLength = 0;
+    private Cell lastRowSuccessfullyProcessed = null;
     private long previousReadConsumed = 0;
     private long previousReadConsumedDifference = 0;
+
+    private PartialResultContext(int quotaCheckInterval) {
+      this.quotaCheckInterval = quotaCheckInterval;
+    }
+  }
+
+  private PartialResultContext newPartialResultContext(Scan scan) {
+    if (scan.getCaching() > 0) {
+      // If the scan has caching set, we will use that as the quota check 
interval.
+      return new PartialResultContext(scan.getCaching());
+    } else {
+      return new PartialResultContext(
+        env.getConfiguration().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 
1000));
+    }
   }
 
   private boolean shouldBreakForThrottling(AggregateRequest request, Scan scan,
     PartialResultContext context) throws IOException {
-    if (request.getClientSupportsPartialResult()) {
+    if (
+      request.getClientSupportsPartialResult() && context.rowsRead % 
context.quotaCheckInterval == 0
+    ) {
       long maxBlockBytesScanned;
       if (context.quota == null) {
         maxBlockBytesScanned = Long.MAX_VALUE;
@@ -571,14 +603,13 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
   }
 
   private void postScanPartialResultUpdate(List<Cell> results, 
PartialResultContext context) {
+    context.rowsRead += 1;
     if (context.quota != null) {
       context.quota.addScanResultCells(results);
     }
     if (!results.isEmpty()) {
       Cell result = results.get(results.size() - 1);
-      context.lastRowSuccessfullyProcessedArray = result.getRowArray();
-      context.lastRowSuccessfullyProcessedOffset = result.getRowOffset();
-      context.lastRowSuccessfullyProcessedLength = result.getRowLength();
+      context.lastRowSuccessfullyProcessed = result;
     }
   }
 
@@ -605,10 +636,9 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     PartialResultContext context) {
     AggregateResponse.Builder builder = AggregateResponse.newBuilder();
     if (request.getClientSupportsPartialResult() && hasMoreRows) {
-      if (context.lastRowSuccessfullyProcessedArray != null) {
-        byte[] lastRowSuccessfullyProcessed = Arrays.copyOfRange(
-          context.lastRowSuccessfullyProcessedArray, 
context.lastRowSuccessfullyProcessedOffset,
-          context.lastRowSuccessfullyProcessedOffset + 
context.lastRowSuccessfullyProcessedLength);
+      if (context.lastRowSuccessfullyProcessed != null) {
+        byte[] lastRowSuccessfullyProcessed =
+          CellUtil.cloneRow(context.lastRowSuccessfullyProcessed);
         builder.setNextChunkStartRow(ByteString.copyFrom(
           
ClientUtil.calculateTheClosestNextRowKeyForPrefix(lastRowSuccessfullyProcessed)));
       } else {
diff --git 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
index fc1063fcd9c..6e83a3f83bf 100644
--- 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
+++ 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
@@ -97,7 +97,7 @@ public class TestAggregateImplementation {
     when(region.getRegionInfo()).thenReturn(regionInfo);
     when(regionInfo.getRegionNameAsString()).thenReturn("testRegion");
 
-    scan = new Scan().addColumn(CF, CQ);
+    scan = new Scan().addColumn(CF, CQ).setCaching(1);
 
     scanner = mock(RegionScannerImpl.class);
     doAnswer(createMockScanner()).when(scanner).next(any(List.class));
@@ -528,6 +528,27 @@ public class TestAggregateImplementation {
       response2.hasNextChunkStartRow());
   }
 
+  @Test
+  public void testRowNumWithScannerCaching() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // set caching such that throttles are not triggered
+    scan = new Scan().addColumn(CF, CQ).setCaching(5);
+    request = AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+    aggregate.getRowNum(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    assertEquals(NUM_ROWS, 
response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+  }
+
   @Test
   public void testRowNumThrottleWithNoResults() throws Exception {
     AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))

Reply via email to