This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new cf4630b7f53 HBASE-29301: Fix AggregrateImplementation pagination logic 
(#6978)
cf4630b7f53 is described below

commit cf4630b7f5394890f5d1c63eb291fe7f638ffe1f
Author: Charles Connell <[email protected]>
AuthorDate: Wed May 21 08:28:29 2025 -0400

    HBASE-29301: Fix AggregrateImplementation pagination logic (#6978)
    
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 hbase-endpoint/pom.xml                             |   4 +
 .../hbase/coprocessor/AggregateImplementation.java | 139 ++-
 .../coprocessor/TestAggregateImplementation.java   | 946 +++++++++++++++++++++
 .../hbase/regionserver/RegionScannerImpl.java      |   2 +-
 4 files changed, 1043 insertions(+), 48 deletions(-)

diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml
index fccac50eea8..f4fdfd4f7fc 100644
--- a/hbase-endpoint/pom.xml
+++ b/hbase-endpoint/pom.xml
@@ -115,6 +115,10 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.github.stephenc.findbugs</groupId>
+      <artifactId>findbugs-annotations</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.bouncycastle</groupId>
       <artifactId>bcprov-jdk18on</artifactId>
diff --git 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 770a338481f..0a107068a43 100644
--- 
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++ 
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -24,6 +24,7 @@ import com.google.protobuf.Message;
 import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
+import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
@@ -32,6 +33,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.function.Function;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -82,7 +84,7 @@ public class AggregateImplementation<T, S, P extends Message, 
Q extends Message,
     AggregateResponse response = null;
     PartialResultContext partialResultContext = new PartialResultContext();
     T max = null;
-    boolean hasMoreRows = false;
+    boolean hasMoreRows = true;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       T temp;
@@ -109,12 +111,8 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
         postScanPartialResultUpdate(results, partialResultContext);
         results.clear();
       } while (hasMoreRows);
-      if (max != null) {
-        AggregateResponse.Builder builder = AggregateResponse.newBuilder();
-        builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
-        setPartialResultResponse(builder, request, hasMoreRows, 
partialResultContext);
-        response = builder.build();
-      }
+      response = singlePartResponse(request, hasMoreRows, 
partialResultContext, max,
+        ci::getProtoForCellType);
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -142,7 +140,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     InternalScanner scanner = null;
     PartialResultContext partialResultContext = new PartialResultContext();
     T min = null;
-    boolean hasMoreRows = false;
+    boolean hasMoreRows = true;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       T temp;
@@ -168,12 +166,8 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
         postScanPartialResultUpdate(results, partialResultContext);
         results.clear();
       } while (hasMoreRows);
-      if (min != null) {
-        AggregateResponse.Builder responseBuilder =
-          
AggregateResponse.newBuilder().addFirstPart(ci.getProtoForCellType(min).toByteString());
-        setPartialResultResponse(responseBuilder, request, hasMoreRows, 
partialResultContext);
-        response = responseBuilder.build();
-      }
+      response = singlePartResponse(request, hasMoreRows, 
partialResultContext, min,
+        ci::getProtoForCellType);
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -201,7 +195,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     InternalScanner scanner = null;
     PartialResultContext partialResultContext = new PartialResultContext();
     long sum = 0L;
-    boolean hasMoreRows = false;
+    boolean hasMoreRows = true;
     try {
       ColumnInterpreter<T, S, P, Q, R> ci = 
constructColumnInterpreterFromRequest(request);
       S sumVal = null;
@@ -230,12 +224,8 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
         postScanPartialResultUpdate(results, partialResultContext);
         results.clear();
       } while (hasMoreRows);
-      if (sumVal != null) {
-        AggregateResponse.Builder responseBuilder = 
AggregateResponse.newBuilder()
-          .addFirstPart(ci.getProtoForPromotedType(sumVal).toByteString());
-        setPartialResultResponse(responseBuilder, request, hasMoreRows, 
partialResultContext);
-        response = responseBuilder.build();
-      }
+      response = singlePartResponse(request, hasMoreRows, 
partialResultContext, sumVal,
+        ci::getProtoForPromotedType);
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -262,7 +252,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     List<Cell> results = new ArrayList<>();
     InternalScanner scanner = null;
     PartialResultContext partialResultContext = new PartialResultContext();
-    boolean hasMoreRows = false;
+    boolean hasMoreRows = true;
     try {
       Scan scan = ProtobufUtil.toScan(request.getScan());
       byte[][] colFamilies = scan.getFamilies();
@@ -290,10 +280,8 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       } while (hasMoreRows);
       ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
       bb.rewind();
-      AggregateResponse.Builder responseBuilder =
-        AggregateResponse.newBuilder().addFirstPart(ByteString.copyFrom(bb));
-      setPartialResultResponse(responseBuilder, request, hasMoreRows, 
partialResultContext);
-      response = responseBuilder.build();
+      response = responseBuilder(request, hasMoreRows, partialResultContext)
+        .addFirstPart(ByteString.copyFrom(bb)).build();
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -337,7 +325,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
         qualifier = qualifiers.pollFirst();
       }
       List<Cell> results = new ArrayList<>();
-      boolean hasMoreRows = false;
+      boolean hasMoreRows = true;
 
       do {
         results.clear();
@@ -353,14 +341,25 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
         rowCountVal++;
         postScanPartialResultUpdate(results, partialResultContext);
       } while (hasMoreRows);
-      if (sumVal != null) {
-        ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
+
+      if (sumVal != null && !request.getClientSupportsPartialResult()) {
         AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+        ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
         pair.addFirstPart(first);
         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
         bb.rewind();
         pair.setSecondPart(ByteString.copyFrom(bb));
-        setPartialResultResponse(pair, request, hasMoreRows, 
partialResultContext);
+        response = pair.build();
+      } else if (request.getClientSupportsPartialResult()) {
+        AggregateResponse.Builder pair =
+          responseBuilder(request, hasMoreRows, partialResultContext);
+        if (sumVal != null) {
+          ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
+          pair.addFirstPart(first);
+          ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+          bb.rewind();
+          pair.setSecondPart(ByteString.copyFrom(bb));
+        }
         response = pair.build();
       }
     } catch (IOException e) {
@@ -402,7 +401,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       }
       List<Cell> results = new ArrayList<>();
 
-      boolean hasMoreRows = false;
+      boolean hasMoreRows = true;
 
       do {
         if (shouldBreakForThrottling(request, scan, partialResultContext)) {
@@ -421,16 +420,29 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
         sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
         rowCountVal++;
       } while (hasMoreRows);
-      if (sumVal != null) {
+
+      if (sumVal != null && !request.getClientSupportsPartialResult()) {
+        AggregateResponse.Builder pair = AggregateResponse.newBuilder();
         ByteString first_sumVal = 
ci.getProtoForPromotedType(sumVal).toByteString();
         ByteString first_sumSqVal = 
ci.getProtoForPromotedType(sumSqVal).toByteString();
-        AggregateResponse.Builder pair = AggregateResponse.newBuilder();
         pair.addFirstPart(first_sumVal);
         pair.addFirstPart(first_sumSqVal);
         ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
         bb.rewind();
         pair.setSecondPart(ByteString.copyFrom(bb));
-        setPartialResultResponse(pair, request, hasMoreRows, 
partialResultContext);
+        response = pair.build();
+      } else if (request.getClientSupportsPartialResult()) {
+        AggregateResponse.Builder pair =
+          responseBuilder(request, hasMoreRows, partialResultContext);
+        if (sumVal != null) {
+          ByteString first_sumVal = 
ci.getProtoForPromotedType(sumVal).toByteString();
+          ByteString first_sumSqVal = 
ci.getProtoForPromotedType(sumSqVal).toByteString();
+          pair.addFirstPart(first_sumVal);
+          pair.addFirstPart(first_sumSqVal);
+          ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+          bb.rewind();
+          pair.setSecondPart(ByteString.copyFrom(bb));
+        }
         response = pair.build();
       }
     } catch (IOException e) {
@@ -471,7 +483,7 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
       }
       List<Cell> results = new ArrayList<>();
 
-      boolean hasMoreRows = false;
+      boolean hasMoreRows = true;
       do {
         if (shouldBreakForThrottling(request, scan, partialResultContext)) {
           break;
@@ -493,14 +505,26 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
         sumVal = ci.add(sumVal, tempVal);
         sumWeights = ci.add(sumWeights, tempWeight);
       } while (hasMoreRows);
-      ByteString first_sumVal = 
ci.getProtoForPromotedType(sumVal).toByteString();
-      S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : 
sumWeights;
-      ByteString first_sumWeights = 
ci.getProtoForPromotedType(s).toByteString();
-      AggregateResponse.Builder pair = AggregateResponse.newBuilder();
-      pair.addFirstPart(first_sumVal);
-      pair.addFirstPart(first_sumWeights);
-      setPartialResultResponse(pair, request, hasMoreRows, 
partialResultContext);
-      response = pair.build();
+      if (sumVal != null && !request.getClientSupportsPartialResult()) {
+        AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+        ByteString first_sumVal = 
ci.getProtoForPromotedType(sumVal).toByteString();
+        S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : 
sumWeights;
+        ByteString first_sumWeights = 
ci.getProtoForPromotedType(s).toByteString();
+        pair.addFirstPart(first_sumVal);
+        pair.addFirstPart(first_sumWeights);
+        response = pair.build();
+      } else if (request.getClientSupportsPartialResult()) {
+        AggregateResponse.Builder pair =
+          responseBuilder(request, hasMoreRows, partialResultContext);
+        if (sumVal != null) {
+          ByteString first_sumVal = 
ci.getProtoForPromotedType(sumVal).toByteString();
+          S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) : 
sumWeights;
+          ByteString first_sumWeights = 
ci.getProtoForPromotedType(s).toByteString();
+          pair.addFirstPart(first_sumVal);
+          pair.addFirstPart(first_sumWeights);
+        }
+        response = pair.build();
+      }
     } catch (IOException e) {
       CoprocessorRpcUtils.setControllerException(controller, e);
     } finally {
@@ -558,10 +582,28 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
     }
   }
 
-  private void setPartialResultResponse(AggregateResponse.Builder builder, 
AggregateRequest request,
-    boolean hasMoreRows, PartialResultContext context) throws IOException {
-    // If we encountered an RpcThrottlingException, tell the client the 
partial result we've
-    // accumulated so far, and what row to start scanning at in order to 
finish the scan.
+  @Nullable
+  private <ACC, RES extends Message> AggregateResponse 
singlePartResponse(AggregateRequest request,
+    boolean hasMoreRows, PartialResultContext partialResultContext, ACC acc,
+    Function<ACC, RES> toRes) {
+    AggregateResponse response = null;
+    if (acc != null && !request.getClientSupportsPartialResult()) {
+      ByteString first = toRes.apply(acc).toByteString();
+      response = AggregateResponse.newBuilder().addFirstPart(first).build();
+    } else if (request.getClientSupportsPartialResult()) {
+      AggregateResponse.Builder responseBuilder =
+        responseBuilder(request, hasMoreRows, partialResultContext);
+      if (acc != null) {
+        responseBuilder.addFirstPart(toRes.apply(acc).toByteString());
+      }
+      response = responseBuilder.build();
+    }
+    return response;
+  }
+
+  private AggregateResponse.Builder responseBuilder(AggregateRequest request, 
boolean hasMoreRows,
+    PartialResultContext context) {
+    AggregateResponse.Builder builder = AggregateResponse.newBuilder();
     if (request.getClientSupportsPartialResult() && hasMoreRows) {
       if (context.lastRowSuccessfullyProcessedArray != null) {
         byte[] lastRowSuccessfullyProcessed = Arrays.copyOfRange(
@@ -569,9 +611,12 @@ public class AggregateImplementation<T, S, P extends 
Message, Q extends Message,
           context.lastRowSuccessfullyProcessedOffset + 
context.lastRowSuccessfullyProcessedLength);
         builder.setNextChunkStartRow(ByteString.copyFrom(
           
ClientUtil.calculateTheClosestNextRowKeyForPrefix(lastRowSuccessfullyProcessed)));
+      } else {
+        builder.setNextChunkStartRow(request.getScan().getStartRow());
       }
       builder.setWaitIntervalMs(context.waitIntervalMs);
     }
+    return builder;
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
new file mode 100644
index 00000000000..fc1063fcd9c
--- /dev/null
+++ 
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
@@ -0,0 +1,946 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static 
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+import static 
org.apache.hadoop.hbase.quotas.RpcThrottlingException.Type.ReadSizeExceeded;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import 
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionScannerImpl;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test AggregateImplementation with throttling and partial results
+ */
+@Category({ SmallTests.class, CoprocessorTests.class })
+public class TestAggregateImplementation {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAggregateImplementation.class);
+
+  private static final byte[] CF = Bytes.toBytes("CF");
+  private static final byte[] CQ = Bytes.toBytes("CQ");
+  private static final int NUM_ROWS = 5;
+  private static final int THROTTLE_AT_ROW = 2;
+  private static final LongColumnInterpreter LONG_COLUMN_INTERPRETER = new 
LongColumnInterpreter();
+
+  private AggregateImplementation<Long, Long, HBaseProtos.LongMsg, 
HBaseProtos.LongMsg,
+    HBaseProtos.LongMsg> aggregate;
+  private RegionCoprocessorEnvironment env;
+  private HRegion region;
+  private RegionScannerImpl scanner;
+  private Scan scan;
+  private AggregateRequest request;
+  private RpcController controller;
+
+  @Before
+  public void setUp() throws Exception {
+    env = mock(RegionCoprocessorEnvironment.class);
+    region = mock(HRegion.class);
+    RegionCoprocessorHost host = mock(RegionCoprocessorHost.class);
+    when(env.getRegion()).thenReturn(region);
+    when(region.getCoprocessorHost()).thenReturn(host);
+
+    RegionInfo regionInfo = mock(RegionInfo.class);
+    when(region.getRegionInfo()).thenReturn(regionInfo);
+    when(regionInfo.getRegionNameAsString()).thenReturn("testRegion");
+
+    scan = new Scan().addColumn(CF, CQ);
+
+    scanner = mock(RegionScannerImpl.class);
+    doAnswer(createMockScanner()).when(scanner).next(any(List.class));
+    when(region.getScanner(any())).thenReturn(scanner);
+
+    doAnswer(createMockQuota()).when(env).checkScanQuota(any(), anyLong(), 
anyLong());
+
+    request = AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    controller = mock(RpcController.class);
+
+    aggregate = new AggregateImplementation<>();
+    aggregate.start(env);
+  }
+
+  private Answer<Boolean> createMockScanner() throws IOException {
+    AtomicInteger callCount = new AtomicInteger(0);
+    return invocation -> {
+      List<Cell> results = (List<Cell>) invocation.getArguments()[0];
+      int call = callCount.getAndIncrement();
+      if (call < NUM_ROWS) {
+        Cell cell = mock(Cell.class);
+        when(cell.getRowArray()).thenReturn(Bytes.toBytes("row" + (call + 1)));
+        when(cell.getRowOffset()).thenReturn(0);
+        when(cell.getRowLength()).thenReturn((short) 4);
+
+        when(cell.getValueArray()).thenReturn(Bytes.toBytes((long) call + 1));
+        when(cell.getValueOffset()).thenReturn(0);
+        when(cell.getValueLength()).thenReturn(8);
+        results.add(cell);
+        return call < NUM_ROWS - 1;
+      } else {
+        // No more rows
+        return false;
+      }
+    };
+  }
+
+  private Answer<OperationQuota> createMockQuota() throws IOException {
+    OperationQuota mockQuota = mock(OperationQuota.class);
+
+    final AtomicInteger rowCount = new AtomicInteger(0);
+
+    return invocation -> {
+      int count = rowCount.incrementAndGet();
+      if (count == THROTTLE_AT_ROW) {
+        RpcThrottlingException throttlingEx =
+          new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled for 
testing");
+        throw throttlingEx;
+      }
+      return mockQuota;
+    };
+  }
+
+  private void reset() throws IOException {
+    // Create a non-throttling quota for the second call, since throttling
+    // should only happen on the first call
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+  }
+
+  @Test
+  public void testMaxWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getMax(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+    // Create a second request with the next chunk start row
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttles for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getMax(controller, request2, callback2);
+
+    verify(callback2).run(responseCaptor.capture());
+
+    AggregateResponse response2 = responseCaptor.getValue();
+    b = response2.getFirstPart(0);
+    q = getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals("Final max value should be correct", 5L,
+      (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+    assertFalse("Response should not indicate there are more rows",
+      response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testMaxThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getMax(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testMaxWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getMax(controller, request, callback);
+    verify(callback).run(responseCaptor.capture());
+    AggregateResponse response = responseCaptor.getValue();
+    assertNull(response);
+  }
+
+  @Test
+  public void testMaxDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getMax(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(5L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+  }
+
+  @Test
+  public void testMinWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // First call should get throttled
+    aggregate.getMin(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttles for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getMin(controller, request2, callback2);
+
+    verify(callback2).run(responseCaptor.capture());
+
+    AggregateResponse response2 = responseCaptor.getValue();
+    b = response.getFirstPart(0);
+    q = getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+    assertFalse("Response should not indicate there are more rows",
+      response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testMinThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getMin(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testMinWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getMin(controller, request, callback);
+    verify(callback).run(responseCaptor.capture());
+    AggregateResponse response = responseCaptor.getValue();
+    assertNull(response);
+  }
+
+  @Test
+  public void testMinDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getMin(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+  }
+
+  @Test
+  public void testSumWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getSum(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+    // Create a second request with the next chunk start row
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttles for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getSum(controller, request2, callback2);
+
+    verify(callback2).run(responseCaptor.capture());
+
+    AggregateResponse response2 = responseCaptor.getValue();
+    b = response2.getFirstPart(0);
+    q = getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(14L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+    assertFalse("Response should not indicate there are more rows",
+      response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testSumThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getSum(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testSumWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getSum(controller, request, callback);
+    verify(callback).run(responseCaptor.capture());
+    AggregateResponse response = responseCaptor.getValue();
+    assertNull(response);
+  }
+
+  @Test
+  public void testSumDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getSum(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    ByteString b = response.getFirstPart(0);
+    HBaseProtos.LongMsg q = 
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+    assertEquals(15L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+  }
+
+  @Test
+  public void testRowNumWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // First call should get throttled
+    aggregate.getRowNum(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    assertEquals(THROTTLE_AT_ROW - 1, 
response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttle for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getRowNum(controller, request2, callback2);
+
+    verify(callback2).run(responseCaptor.capture());
+
+    AggregateResponse response2 = responseCaptor.getValue();
+    assertEquals("Final row count should be correct", NUM_ROWS - 
THROTTLE_AT_ROW + 1,
+      response2.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+    assertFalse("Response should not indicate there are more rows",
+      response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testRowNumThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getRowNum(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testRowNumWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getRowNum(controller, request, callback);
+    verify(callback).run(responseCaptor.capture());
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should indicate there are no more rows", 
response.hasNextChunkStartRow());
+    assertEquals(0, response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+  }
+
+  @Test
+  public void testRowNumDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getRowNum(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    assertEquals("Final row count should be correct", NUM_ROWS,
+      response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+  }
+
+  @Test
+  public void testAvgWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // First call should get throttled
+    aggregate.getAvg(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    assertEquals("sum should be 1", 1L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(0))));
+    assertEquals("count should be 1", THROTTLE_AT_ROW - 1,
+      response.getSecondPart().asReadOnlyByteBuffer().getLong());
+
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttle for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getAvg(controller, request2, callback2);
+
+    verify(callback2).run(responseCaptor.capture());
+
+    AggregateResponse response2 = responseCaptor.getValue();
+    assertEquals("sum should be 14", 14L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response2.getFirstPart(0))));
+    assertEquals("count should be 4", NUM_ROWS - THROTTLE_AT_ROW + 1,
+      response2.getSecondPart().asReadOnlyByteBuffer().getLong());
+    assertFalse("Response should not indicate there are more rows",
+      response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testAvgThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getAvg(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testAvgWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getAvg(controller, request, callback);
+    verify(callback).run(responseCaptor.capture());
+    AggregateResponse response = responseCaptor.getValue();
+    assertNull(response);
+  }
+
+  @Test
+  public void testAvgDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getAvg(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    assertEquals("sum should be 15", 15L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(0))));
+    assertEquals("count should be 5", NUM_ROWS,
+      response.getSecondPart().asReadOnlyByteBuffer().getLong());
+  }
+
+  @Test
+  public void testStdWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // First call should get throttled
+    aggregate.getStd(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    assertEquals("sum should be 1", 1L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(0))));
+    assertEquals("sumSq should be 1", 1L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(1))));
+    assertEquals("count should be 1", THROTTLE_AT_ROW - 1,
+      response.getSecondPart().asReadOnlyByteBuffer().getLong());
+
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttle for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getStd(controller, request2, callback2);
+
+    verify(callback2).run(responseCaptor.capture());
+
+    AggregateResponse response2 = responseCaptor.getValue();
+    assertEquals("sum should be 14", 14L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response2.getFirstPart(0))));
+    assertEquals("sumSq should be 54", 54L,
+      (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(getParsedGenericInstance(
+        LONG_COLUMN_INTERPRETER.getClass(), 3, response2.getFirstPart(1))));
+    assertEquals("count should be 4", NUM_ROWS - THROTTLE_AT_ROW + 1,
+      response2.getSecondPart().asReadOnlyByteBuffer().getLong());
+    assertFalse("Response should not indicate there are more rows",
+      response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testStdThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getStd(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testStdWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    aggregate.getStd(controller, request, callback);
+    verify(callback).run(responseCaptor.capture());
+    AggregateResponse response = responseCaptor.getValue();
+    assertNull(response);
+  }
+
+  @Test
+  public void testStdDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getStd(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertFalse("Response should not indicate there are more rows",
+      response.hasNextChunkStartRow());
+    assertEquals("sum should be 15", 15L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(0))));
+    assertEquals("sumSq should be 55", 55L,
+      (long) LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+        getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(1))));
+    assertEquals("count should be 5", NUM_ROWS,
+      response.getSecondPart().asReadOnlyByteBuffer().getLong());
+  }
+
+  @Test
+  public void testMedianWithThrottling() throws Exception {
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // First call should get throttled
+    aggregate.getMedian(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("Wait interval should be set", 1000, 
response.getWaitIntervalMs());
+    assertEquals("sum should be 1", 1L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(0))));
+
+    AggregateRequest request2 = AggregateRequest.newBuilder(request)
+      
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+      .build();
+
+    // Reset throttle for second call
+    reset();
+
+    RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+    aggregate.getMedian(controller, request2, callback2);
+
+    verify(callback2).run(responseCaptor.capture());
+
+    AggregateResponse response2 = responseCaptor.getValue();
+    assertEquals("sum should be 14", 14L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response2.getFirstPart(0))));
+    assertFalse("Response should indicate there are more rows", 
response2.hasNextChunkStartRow());
+  }
+
+  @Test
+  public void testMedianThrottleWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(true).build();
+
+    when(env.checkScanQuota(any(), anyLong(), anyLong()))
+      .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled 
for testing"));
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // call gets no results, and response should contain the same start row as 
the request, because
+    // no progress in the scan was made
+    aggregate.getMedian(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertTrue("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+    assertEquals("response should contain the same start row as the request",
+      request.getScan().getStartRow(), response.getNextChunkStartRow());
+  }
+
+  @Test
+  public void testMedianWithNoResults() throws Exception {
+    AggregateRequest request = 
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+      .setInterpreterClassName(LongColumnInterpreter.class.getName())
+      .setClientSupportsPartialResult(false).build();
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+    aggregate.getMedian(controller, request, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertNull(response);
+  }
+
+  @Test
+  public void testMedianDoesNotSupportPartialResults() throws Exception {
+    AggregateRequest noPartialRequest =
+      AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+        .setInterpreterClassName(LongColumnInterpreter.class.getName())
+        .setClientSupportsPartialResult(false).build();
+
+    OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+    when(env.checkScanQuota(any(), anyLong(), 
anyLong())).thenReturn(nonThrottlingQuota);
+
+    ArgumentCaptor<AggregateResponse> responseCaptor =
+      ArgumentCaptor.forClass(AggregateResponse.class);
+    RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+    // Call should complete without throttling
+    aggregate.getMedian(controller, noPartialRequest, callback);
+
+    verify(callback).run(responseCaptor.capture());
+
+    AggregateResponse response = responseCaptor.getValue();
+
+    assertEquals("sum should be 15", 15L, (long) 
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+      getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, 
response.getFirstPart(0))));
+    assertFalse("Response should indicate there are more rows", 
response.hasNextChunkStartRow());
+  }
+
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index 4bebf5d927c..def22fc32a0 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -58,7 +58,7 @@ import 
org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
  * RegionScannerImpl is used to combine scanners from multiple Stores (aka 
column families).
  */
 @InterfaceAudience.Private
-class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
+public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RegionScannerImpl.class);
 

Reply via email to