This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new cf4630b7f53 HBASE-29301: Fix AggregrateImplementation pagination logic
(#6978)
cf4630b7f53 is described below
commit cf4630b7f5394890f5d1c63eb291fe7f638ffe1f
Author: Charles Connell <[email protected]>
AuthorDate: Wed May 21 08:28:29 2025 -0400
HBASE-29301: Fix AggregrateImplementation pagination logic (#6978)
Signed-off-by: Nick Dimiduk <[email protected]>
---
hbase-endpoint/pom.xml | 4 +
.../hbase/coprocessor/AggregateImplementation.java | 139 ++-
.../coprocessor/TestAggregateImplementation.java | 946 +++++++++++++++++++++
.../hbase/regionserver/RegionScannerImpl.java | 2 +-
4 files changed, 1043 insertions(+), 48 deletions(-)
diff --git a/hbase-endpoint/pom.xml b/hbase-endpoint/pom.xml
index fccac50eea8..f4fdfd4f7fc 100644
--- a/hbase-endpoint/pom.xml
+++ b/hbase-endpoint/pom.xml
@@ -115,6 +115,10 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.github.stephenc.findbugs</groupId>
+ <artifactId>findbugs-annotations</artifactId>
+ </dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
diff --git
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 770a338481f..0a107068a43 100644
---
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -24,6 +24,7 @@ import com.google.protobuf.Message;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
+import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
@@ -32,6 +33,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
+import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
@@ -82,7 +84,7 @@ public class AggregateImplementation<T, S, P extends Message,
Q extends Message,
AggregateResponse response = null;
PartialResultContext partialResultContext = new PartialResultContext();
T max = null;
- boolean hasMoreRows = false;
+ boolean hasMoreRows = true;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
T temp;
@@ -109,12 +111,8 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
postScanPartialResultUpdate(results, partialResultContext);
results.clear();
} while (hasMoreRows);
- if (max != null) {
- AggregateResponse.Builder builder = AggregateResponse.newBuilder();
- builder.addFirstPart(ci.getProtoForCellType(max).toByteString());
- setPartialResultResponse(builder, request, hasMoreRows,
partialResultContext);
- response = builder.build();
- }
+ response = singlePartResponse(request, hasMoreRows,
partialResultContext, max,
+ ci::getProtoForCellType);
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -142,7 +140,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
InternalScanner scanner = null;
PartialResultContext partialResultContext = new PartialResultContext();
T min = null;
- boolean hasMoreRows = false;
+ boolean hasMoreRows = true;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
T temp;
@@ -168,12 +166,8 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
postScanPartialResultUpdate(results, partialResultContext);
results.clear();
} while (hasMoreRows);
- if (min != null) {
- AggregateResponse.Builder responseBuilder =
-
AggregateResponse.newBuilder().addFirstPart(ci.getProtoForCellType(min).toByteString());
- setPartialResultResponse(responseBuilder, request, hasMoreRows,
partialResultContext);
- response = responseBuilder.build();
- }
+ response = singlePartResponse(request, hasMoreRows,
partialResultContext, min,
+ ci::getProtoForCellType);
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -201,7 +195,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
InternalScanner scanner = null;
PartialResultContext partialResultContext = new PartialResultContext();
long sum = 0L;
- boolean hasMoreRows = false;
+ boolean hasMoreRows = true;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
S sumVal = null;
@@ -230,12 +224,8 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
postScanPartialResultUpdate(results, partialResultContext);
results.clear();
} while (hasMoreRows);
- if (sumVal != null) {
- AggregateResponse.Builder responseBuilder =
AggregateResponse.newBuilder()
- .addFirstPart(ci.getProtoForPromotedType(sumVal).toByteString());
- setPartialResultResponse(responseBuilder, request, hasMoreRows,
partialResultContext);
- response = responseBuilder.build();
- }
+ response = singlePartResponse(request, hasMoreRows,
partialResultContext, sumVal,
+ ci::getProtoForPromotedType);
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -262,7 +252,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
List<Cell> results = new ArrayList<>();
InternalScanner scanner = null;
PartialResultContext partialResultContext = new PartialResultContext();
- boolean hasMoreRows = false;
+ boolean hasMoreRows = true;
try {
Scan scan = ProtobufUtil.toScan(request.getScan());
byte[][] colFamilies = scan.getFamilies();
@@ -290,10 +280,8 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
} while (hasMoreRows);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(counter);
bb.rewind();
- AggregateResponse.Builder responseBuilder =
- AggregateResponse.newBuilder().addFirstPart(ByteString.copyFrom(bb));
- setPartialResultResponse(responseBuilder, request, hasMoreRows,
partialResultContext);
- response = responseBuilder.build();
+ response = responseBuilder(request, hasMoreRows, partialResultContext)
+ .addFirstPart(ByteString.copyFrom(bb)).build();
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -337,7 +325,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
qualifier = qualifiers.pollFirst();
}
List<Cell> results = new ArrayList<>();
- boolean hasMoreRows = false;
+ boolean hasMoreRows = true;
do {
results.clear();
@@ -353,14 +341,25 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
rowCountVal++;
postScanPartialResultUpdate(results, partialResultContext);
} while (hasMoreRows);
- if (sumVal != null) {
- ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
+
+ if (sumVal != null && !request.getClientSupportsPartialResult()) {
AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+ ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
pair.addFirstPart(first);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
- setPartialResultResponse(pair, request, hasMoreRows,
partialResultContext);
+ response = pair.build();
+ } else if (request.getClientSupportsPartialResult()) {
+ AggregateResponse.Builder pair =
+ responseBuilder(request, hasMoreRows, partialResultContext);
+ if (sumVal != null) {
+ ByteString first = ci.getProtoForPromotedType(sumVal).toByteString();
+ pair.addFirstPart(first);
+ ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+ bb.rewind();
+ pair.setSecondPart(ByteString.copyFrom(bb));
+ }
response = pair.build();
}
} catch (IOException e) {
@@ -402,7 +401,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
List<Cell> results = new ArrayList<>();
- boolean hasMoreRows = false;
+ boolean hasMoreRows = true;
do {
if (shouldBreakForThrottling(request, scan, partialResultContext)) {
@@ -421,16 +420,29 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
sumSqVal = ci.add(sumSqVal, ci.multiply(tempVal, tempVal));
rowCountVal++;
} while (hasMoreRows);
- if (sumVal != null) {
+
+ if (sumVal != null && !request.getClientSupportsPartialResult()) {
+ AggregateResponse.Builder pair = AggregateResponse.newBuilder();
ByteString first_sumVal =
ci.getProtoForPromotedType(sumVal).toByteString();
ByteString first_sumSqVal =
ci.getProtoForPromotedType(sumSqVal).toByteString();
- AggregateResponse.Builder pair = AggregateResponse.newBuilder();
pair.addFirstPart(first_sumVal);
pair.addFirstPart(first_sumSqVal);
ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
bb.rewind();
pair.setSecondPart(ByteString.copyFrom(bb));
- setPartialResultResponse(pair, request, hasMoreRows,
partialResultContext);
+ response = pair.build();
+ } else if (request.getClientSupportsPartialResult()) {
+ AggregateResponse.Builder pair =
+ responseBuilder(request, hasMoreRows, partialResultContext);
+ if (sumVal != null) {
+ ByteString first_sumVal =
ci.getProtoForPromotedType(sumVal).toByteString();
+ ByteString first_sumSqVal =
ci.getProtoForPromotedType(sumSqVal).toByteString();
+ pair.addFirstPart(first_sumVal);
+ pair.addFirstPart(first_sumSqVal);
+ ByteBuffer bb = ByteBuffer.allocate(8).putLong(rowCountVal);
+ bb.rewind();
+ pair.setSecondPart(ByteString.copyFrom(bb));
+ }
response = pair.build();
}
} catch (IOException e) {
@@ -471,7 +483,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
List<Cell> results = new ArrayList<>();
- boolean hasMoreRows = false;
+ boolean hasMoreRows = true;
do {
if (shouldBreakForThrottling(request, scan, partialResultContext)) {
break;
@@ -493,14 +505,26 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
sumVal = ci.add(sumVal, tempVal);
sumWeights = ci.add(sumWeights, tempWeight);
} while (hasMoreRows);
- ByteString first_sumVal =
ci.getProtoForPromotedType(sumVal).toByteString();
- S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) :
sumWeights;
- ByteString first_sumWeights =
ci.getProtoForPromotedType(s).toByteString();
- AggregateResponse.Builder pair = AggregateResponse.newBuilder();
- pair.addFirstPart(first_sumVal);
- pair.addFirstPart(first_sumWeights);
- setPartialResultResponse(pair, request, hasMoreRows,
partialResultContext);
- response = pair.build();
+ if (sumVal != null && !request.getClientSupportsPartialResult()) {
+ AggregateResponse.Builder pair = AggregateResponse.newBuilder();
+ ByteString first_sumVal =
ci.getProtoForPromotedType(sumVal).toByteString();
+ S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) :
sumWeights;
+ ByteString first_sumWeights =
ci.getProtoForPromotedType(s).toByteString();
+ pair.addFirstPart(first_sumVal);
+ pair.addFirstPart(first_sumWeights);
+ response = pair.build();
+ } else if (request.getClientSupportsPartialResult()) {
+ AggregateResponse.Builder pair =
+ responseBuilder(request, hasMoreRows, partialResultContext);
+ if (sumVal != null) {
+ ByteString first_sumVal =
ci.getProtoForPromotedType(sumVal).toByteString();
+ S s = sumWeights == null ? ci.castToReturnType(ci.getMinValue()) :
sumWeights;
+ ByteString first_sumWeights =
ci.getProtoForPromotedType(s).toByteString();
+ pair.addFirstPart(first_sumVal);
+ pair.addFirstPart(first_sumWeights);
+ }
+ response = pair.build();
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -558,10 +582,28 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
}
- private void setPartialResultResponse(AggregateResponse.Builder builder,
AggregateRequest request,
- boolean hasMoreRows, PartialResultContext context) throws IOException {
- // If we encountered an RpcThrottlingException, tell the client the
partial result we've
- // accumulated so far, and what row to start scanning at in order to
finish the scan.
+ @Nullable
+ private <ACC, RES extends Message> AggregateResponse
singlePartResponse(AggregateRequest request,
+ boolean hasMoreRows, PartialResultContext partialResultContext, ACC acc,
+ Function<ACC, RES> toRes) {
+ AggregateResponse response = null;
+ if (acc != null && !request.getClientSupportsPartialResult()) {
+ ByteString first = toRes.apply(acc).toByteString();
+ response = AggregateResponse.newBuilder().addFirstPart(first).build();
+ } else if (request.getClientSupportsPartialResult()) {
+ AggregateResponse.Builder responseBuilder =
+ responseBuilder(request, hasMoreRows, partialResultContext);
+ if (acc != null) {
+ responseBuilder.addFirstPart(toRes.apply(acc).toByteString());
+ }
+ response = responseBuilder.build();
+ }
+ return response;
+ }
+
+ private AggregateResponse.Builder responseBuilder(AggregateRequest request,
boolean hasMoreRows,
+ PartialResultContext context) {
+ AggregateResponse.Builder builder = AggregateResponse.newBuilder();
if (request.getClientSupportsPartialResult() && hasMoreRows) {
if (context.lastRowSuccessfullyProcessedArray != null) {
byte[] lastRowSuccessfullyProcessed = Arrays.copyOfRange(
@@ -569,9 +611,12 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
context.lastRowSuccessfullyProcessedOffset +
context.lastRowSuccessfullyProcessedLength);
builder.setNextChunkStartRow(ByteString.copyFrom(
ClientUtil.calculateTheClosestNextRowKeyForPrefix(lastRowSuccessfullyProcessed)));
+ } else {
+ builder.setNextChunkStartRow(request.getScan().getStartRow());
}
builder.setWaitIntervalMs(context.waitIntervalMs);
}
+ return builder;
}
@SuppressWarnings("unchecked")
diff --git
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
new file mode 100644
index 00000000000..fc1063fcd9c
--- /dev/null
+++
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
@@ -0,0 +1,946 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static
org.apache.hadoop.hbase.client.coprocessor.AggregationHelper.getParsedGenericInstance;
+import static
org.apache.hadoop.hbase.quotas.RpcThrottlingException.Type.ReadSizeExceeded;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateRequest;
+import
org.apache.hadoop.hbase.protobuf.generated.AggregateProtos.AggregateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.RegionScannerImpl;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.ArgumentCaptor;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test AggregateImplementation with throttling and partial results
+ */
+@Category({ SmallTests.class, CoprocessorTests.class })
+public class TestAggregateImplementation {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAggregateImplementation.class);
+
+ private static final byte[] CF = Bytes.toBytes("CF");
+ private static final byte[] CQ = Bytes.toBytes("CQ");
+ private static final int NUM_ROWS = 5;
+ private static final int THROTTLE_AT_ROW = 2;
+ private static final LongColumnInterpreter LONG_COLUMN_INTERPRETER = new
LongColumnInterpreter();
+
+ private AggregateImplementation<Long, Long, HBaseProtos.LongMsg,
HBaseProtos.LongMsg,
+ HBaseProtos.LongMsg> aggregate;
+ private RegionCoprocessorEnvironment env;
+ private HRegion region;
+ private RegionScannerImpl scanner;
+ private Scan scan;
+ private AggregateRequest request;
+ private RpcController controller;
+
+ @Before
+ public void setUp() throws Exception {
+ env = mock(RegionCoprocessorEnvironment.class);
+ region = mock(HRegion.class);
+ RegionCoprocessorHost host = mock(RegionCoprocessorHost.class);
+ when(env.getRegion()).thenReturn(region);
+ when(region.getCoprocessorHost()).thenReturn(host);
+
+ RegionInfo regionInfo = mock(RegionInfo.class);
+ when(region.getRegionInfo()).thenReturn(regionInfo);
+ when(regionInfo.getRegionNameAsString()).thenReturn("testRegion");
+
+ scan = new Scan().addColumn(CF, CQ);
+
+ scanner = mock(RegionScannerImpl.class);
+ doAnswer(createMockScanner()).when(scanner).next(any(List.class));
+ when(region.getScanner(any())).thenReturn(scanner);
+
+ doAnswer(createMockQuota()).when(env).checkScanQuota(any(), anyLong(),
anyLong());
+
+ request = AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ controller = mock(RpcController.class);
+
+ aggregate = new AggregateImplementation<>();
+ aggregate.start(env);
+ }
+
+ private Answer<Boolean> createMockScanner() throws IOException {
+ AtomicInteger callCount = new AtomicInteger(0);
+ return invocation -> {
+ List<Cell> results = (List<Cell>) invocation.getArguments()[0];
+ int call = callCount.getAndIncrement();
+ if (call < NUM_ROWS) {
+ Cell cell = mock(Cell.class);
+ when(cell.getRowArray()).thenReturn(Bytes.toBytes("row" + (call + 1)));
+ when(cell.getRowOffset()).thenReturn(0);
+ when(cell.getRowLength()).thenReturn((short) 4);
+
+ when(cell.getValueArray()).thenReturn(Bytes.toBytes((long) call + 1));
+ when(cell.getValueOffset()).thenReturn(0);
+ when(cell.getValueLength()).thenReturn(8);
+ results.add(cell);
+ return call < NUM_ROWS - 1;
+ } else {
+ // No more rows
+ return false;
+ }
+ };
+ }
+
+ private Answer<OperationQuota> createMockQuota() throws IOException {
+ OperationQuota mockQuota = mock(OperationQuota.class);
+
+ final AtomicInteger rowCount = new AtomicInteger(0);
+
+ return invocation -> {
+ int count = rowCount.incrementAndGet();
+ if (count == THROTTLE_AT_ROW) {
+ RpcThrottlingException throttlingEx =
+ new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled for
testing");
+ throw throttlingEx;
+ }
+ return mockQuota;
+ };
+ }
+
+ private void reset() throws IOException {
+ // Create a non-throttling quota for the second call, since throttling
+ // should only happen on the first call
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+ }
+
+ @Test
+ public void testMaxWithThrottling() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getMax(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("Wait interval should be set", 1000,
response.getWaitIntervalMs());
+ ByteString b = response.getFirstPart(0);
+ HBaseProtos.LongMsg q =
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+ // Create a second request with the next chunk start row
+ AggregateRequest request2 = AggregateRequest.newBuilder(request)
+
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+ .build();
+
+ // Reset throttles for second call
+ reset();
+
+ RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+ aggregate.getMax(controller, request2, callback2);
+
+ verify(callback2).run(responseCaptor.capture());
+
+ AggregateResponse response2 = responseCaptor.getValue();
+ b = response2.getFirstPart(0);
+ q = getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals("Final max value should be correct", 5L,
+ (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+ assertFalse("Response should not indicate there are more rows",
+ response2.hasNextChunkStartRow());
+ }
+
+ @Test
+ public void testMaxThrottleWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ when(env.checkScanQuota(any(), anyLong(), anyLong()))
+ .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled
for testing"));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // call gets no results, and response should contain the same start row as
the request, because
+ // no progress in the scan was made
+ aggregate.getMax(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("response should contain the same start row as the request",
+ request.getScan().getStartRow(), response.getNextChunkStartRow());
+ }
+
+ @Test
+ public void testMaxWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getMax(controller, request, callback);
+ verify(callback).run(responseCaptor.capture());
+ AggregateResponse response = responseCaptor.getValue();
+ assertNull(response);
+ }
+
+ @Test
+ public void testMaxDoesNotSupportPartialResults() throws Exception {
+ AggregateRequest noPartialRequest =
+ AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // Call should complete without throttling
+ aggregate.getMax(controller, noPartialRequest, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ ByteString b = response.getFirstPart(0);
+ HBaseProtos.LongMsg q =
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(5L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+ }
+
+ @Test
+ public void testMinWithThrottling() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // First call should get throttled
+ aggregate.getMin(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("Wait interval should be set", 1000,
response.getWaitIntervalMs());
+ ByteString b = response.getFirstPart(0);
+ HBaseProtos.LongMsg q =
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+ AggregateRequest request2 = AggregateRequest.newBuilder(request)
+
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+ .build();
+
+ // Reset throttles for second call
+ reset();
+
+ RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+ aggregate.getMin(controller, request2, callback2);
+
+ verify(callback2).run(responseCaptor.capture());
+
+ AggregateResponse response2 = responseCaptor.getValue();
+ b = response.getFirstPart(0);
+ q = getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+ assertFalse("Response should not indicate there are more rows",
+ response2.hasNextChunkStartRow());
+ }
+
+ @Test
+ public void testMinThrottleWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ when(env.checkScanQuota(any(), anyLong(), anyLong()))
+ .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled
for testing"));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // call gets no results, and response should contain the same start row as
the request, because
+ // no progress in the scan was made
+ aggregate.getMin(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("response should contain the same start row as the request",
+ request.getScan().getStartRow(), response.getNextChunkStartRow());
+ }
+
+ @Test
+ public void testMinWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getMin(controller, request, callback);
+ verify(callback).run(responseCaptor.capture());
+ AggregateResponse response = responseCaptor.getValue();
+ assertNull(response);
+ }
+
+ @Test
+ public void testMinDoesNotSupportPartialResults() throws Exception {
+ AggregateRequest noPartialRequest =
+ AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // Call should complete without throttling
+ aggregate.getMin(controller, noPartialRequest, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ ByteString b = response.getFirstPart(0);
+ HBaseProtos.LongMsg q =
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+ }
+
+ @Test
+ public void testSumWithThrottling() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getSum(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("Wait interval should be set", 1000,
response.getWaitIntervalMs());
+ ByteString b = response.getFirstPart(0);
+ HBaseProtos.LongMsg q =
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(1L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+
+ // Create a second request with the next chunk start row
+ AggregateRequest request2 = AggregateRequest.newBuilder(request)
+
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+ .build();
+
+ // Reset throttles for second call
+ reset();
+
+ RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+ aggregate.getSum(controller, request2, callback2);
+
+ verify(callback2).run(responseCaptor.capture());
+
+ AggregateResponse response2 = responseCaptor.getValue();
+ b = response2.getFirstPart(0);
+ q = getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(14L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+ assertFalse("Response should not indicate there are more rows",
+ response2.hasNextChunkStartRow());
+ }
+
+ @Test
+ public void testSumThrottleWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ when(env.checkScanQuota(any(), anyLong(), anyLong()))
+ .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled
for testing"));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // call gets no results, and response should contain the same start row as
the request, because
+ // no progress in the scan was made
+ aggregate.getSum(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("response should contain the same start row as the request",
+ request.getScan().getStartRow(), response.getNextChunkStartRow());
+ }
+
+ @Test
+ public void testSumWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getSum(controller, request, callback);
+ verify(callback).run(responseCaptor.capture());
+ AggregateResponse response = responseCaptor.getValue();
+ assertNull(response);
+ }
+
+ @Test
+ public void testSumDoesNotSupportPartialResults() throws Exception {
+ AggregateRequest noPartialRequest =
+ AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // Call should complete without throttling
+ aggregate.getSum(controller, noPartialRequest, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ ByteString b = response.getFirstPart(0);
+ HBaseProtos.LongMsg q =
getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3, b);
+ assertEquals(15L, (long) LONG_COLUMN_INTERPRETER.getCellValueFromProto(q));
+ }
+
+ @Test
+ public void testRowNumWithThrottling() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // First call should get throttled
+ aggregate.getRowNum(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("Wait interval should be set", 1000,
response.getWaitIntervalMs());
+ assertEquals(THROTTLE_AT_ROW - 1,
response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+
+ AggregateRequest request2 = AggregateRequest.newBuilder(request)
+
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+ .build();
+
+ // Reset throttle for second call
+ reset();
+
+ RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+ aggregate.getRowNum(controller, request2, callback2);
+
+ verify(callback2).run(responseCaptor.capture());
+
+ AggregateResponse response2 = responseCaptor.getValue();
+ assertEquals("Final row count should be correct", NUM_ROWS -
THROTTLE_AT_ROW + 1,
+ response2.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+ assertFalse("Response should not indicate there are more rows",
+ response2.hasNextChunkStartRow());
+ }
+
+ @Test
+ public void testRowNumThrottleWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ when(env.checkScanQuota(any(), anyLong(), anyLong()))
+ .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled
for testing"));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // call gets no results, and response should contain the same start row as
the request, because
+ // no progress in the scan was made
+ aggregate.getRowNum(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("response should contain the same start row as the request",
+ request.getScan().getStartRow(), response.getNextChunkStartRow());
+ }
+
+ @Test
+ public void testRowNumWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getRowNum(controller, request, callback);
+ verify(callback).run(responseCaptor.capture());
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertFalse("Response should indicate there are no more rows",
response.hasNextChunkStartRow());
+ assertEquals(0, response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+ }
+
+ @Test
+ public void testRowNumDoesNotSupportPartialResults() throws Exception {
+ AggregateRequest noPartialRequest =
+ AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // Call should complete without throttling
+ aggregate.getRowNum(controller, noPartialRequest, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ assertEquals("Final row count should be correct", NUM_ROWS,
+ response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+ }
+
+ @Test
+ public void testAvgWithThrottling() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // First call should get throttled
+ aggregate.getAvg(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("Wait interval should be set", 1000,
response.getWaitIntervalMs());
+ assertEquals("sum should be 1", 1L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(0))));
+ assertEquals("count should be 1", THROTTLE_AT_ROW - 1,
+ response.getSecondPart().asReadOnlyByteBuffer().getLong());
+
+ AggregateRequest request2 = AggregateRequest.newBuilder(request)
+
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+ .build();
+
+ // Reset throttle for second call
+ reset();
+
+ RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+ aggregate.getAvg(controller, request2, callback2);
+
+ verify(callback2).run(responseCaptor.capture());
+
+ AggregateResponse response2 = responseCaptor.getValue();
+ assertEquals("sum should be 14", 14L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response2.getFirstPart(0))));
+ assertEquals("count should be 4", NUM_ROWS - THROTTLE_AT_ROW + 1,
+ response2.getSecondPart().asReadOnlyByteBuffer().getLong());
+ assertFalse("Response should not indicate there are more rows",
+ response2.hasNextChunkStartRow());
+ }
+
+ @Test
+ public void testAvgThrottleWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ when(env.checkScanQuota(any(), anyLong(), anyLong()))
+ .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled
for testing"));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // call gets no results, and response should contain the same start row as
the request, because
+ // no progress in the scan was made
+ aggregate.getAvg(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("response should contain the same start row as the request",
+ request.getScan().getStartRow(), response.getNextChunkStartRow());
+ }
+
+ @Test
+ public void testAvgWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getAvg(controller, request, callback);
+ verify(callback).run(responseCaptor.capture());
+ AggregateResponse response = responseCaptor.getValue();
+ assertNull(response);
+ }
+
+ @Test
+ public void testAvgDoesNotSupportPartialResults() throws Exception {
+ AggregateRequest noPartialRequest =
+ AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // Call should complete without throttling
+ aggregate.getAvg(controller, noPartialRequest, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ assertEquals("sum should be 15", 15L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(0))));
+ assertEquals("count should be 5", NUM_ROWS,
+ response.getSecondPart().asReadOnlyByteBuffer().getLong());
+ }
+
+ @Test
+ public void testStdWithThrottling() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // First call should get throttled
+ aggregate.getStd(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("Wait interval should be set", 1000,
response.getWaitIntervalMs());
+ assertEquals("sum should be 1", 1L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(0))));
+ assertEquals("sumSq should be 1", 1L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(1))));
+ assertEquals("count should be 1", THROTTLE_AT_ROW - 1,
+ response.getSecondPart().asReadOnlyByteBuffer().getLong());
+
+ AggregateRequest request2 = AggregateRequest.newBuilder(request)
+
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+ .build();
+
+ // Reset throttle for second call
+ reset();
+
+ RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+ aggregate.getStd(controller, request2, callback2);
+
+ verify(callback2).run(responseCaptor.capture());
+
+ AggregateResponse response2 = responseCaptor.getValue();
+ assertEquals("sum should be 14", 14L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response2.getFirstPart(0))));
+ assertEquals("sumSq should be 54", 54L,
+ (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(getParsedGenericInstance(
+ LONG_COLUMN_INTERPRETER.getClass(), 3, response2.getFirstPart(1))));
+ assertEquals("count should be 4", NUM_ROWS - THROTTLE_AT_ROW + 1,
+ response2.getSecondPart().asReadOnlyByteBuffer().getLong());
+ assertFalse("Response should not indicate there are more rows",
+ response2.hasNextChunkStartRow());
+ }
+
+ @Test
+ public void testStdThrottleWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ when(env.checkScanQuota(any(), anyLong(), anyLong()))
+ .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled
for testing"));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // call gets no results, and response should contain the same start row as
the request, because
+ // no progress in the scan was made
+ aggregate.getStd(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("response should contain the same start row as the request",
+ request.getScan().getStartRow(), response.getNextChunkStartRow());
+ }
+
+ @Test
+ public void testStdWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ aggregate.getStd(controller, request, callback);
+ verify(callback).run(responseCaptor.capture());
+ AggregateResponse response = responseCaptor.getValue();
+ assertNull(response);
+ }
+
+ @Test
+ public void testStdDoesNotSupportPartialResults() throws Exception {
+ AggregateRequest noPartialRequest =
+ AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // Call should complete without throttling
+ aggregate.getStd(controller, noPartialRequest, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ assertEquals("sum should be 15", 15L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(0))));
+ assertEquals("sumSq should be 55", 55L,
+ (long) LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(1))));
+ assertEquals("count should be 5", NUM_ROWS,
+ response.getSecondPart().asReadOnlyByteBuffer().getLong());
+ }
+
+ @Test
+ public void testMedianWithThrottling() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // First call should get throttled
+ aggregate.getMedian(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("Wait interval should be set", 1000,
response.getWaitIntervalMs());
+ assertEquals("sum should be 1", 1L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(0))));
+
+ AggregateRequest request2 = AggregateRequest.newBuilder(request)
+
.setScan(request.getScan().toBuilder().setStartRow(response.getNextChunkStartRow()).build())
+ .build();
+
+ // Reset throttle for second call
+ reset();
+
+ RpcCallback<AggregateResponse> callback2 = mock(RpcCallback.class);
+ aggregate.getMedian(controller, request2, callback2);
+
+ verify(callback2).run(responseCaptor.capture());
+
+ AggregateResponse response2 = responseCaptor.getValue();
+ assertEquals("sum should be 14", 14L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response2.getFirstPart(0))));
+ assertFalse("Response should indicate there are more rows",
response2.hasNextChunkStartRow());
+ }
+
+ @Test
+ public void testMedianThrottleWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+
+ when(env.checkScanQuota(any(), anyLong(), anyLong()))
+ .thenThrow(new RpcThrottlingException(ReadSizeExceeded, 1000, "Throttled
for testing"));
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // call gets no results, and response should contain the same start row as
the request, because
+ // no progress in the scan was made
+ aggregate.getMedian(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertTrue("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ assertEquals("response should contain the same start row as the request",
+ request.getScan().getStartRow(), response.getNextChunkStartRow());
+ }
+
+ @Test
+ public void testMedianWithNoResults() throws Exception {
+ AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ doAnswer(invocation -> false).when(scanner).next(any(List.class));
+
+ aggregate.getMedian(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertNull(response);
+ }
+
+ @Test
+ public void testMedianDoesNotSupportPartialResults() throws Exception {
+ AggregateRequest noPartialRequest =
+ AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(false).build();
+
+ OperationQuota nonThrottlingQuota = mock(OperationQuota.class);
+ when(env.checkScanQuota(any(), anyLong(),
anyLong())).thenReturn(nonThrottlingQuota);
+
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // Call should complete without throttling
+ aggregate.getMedian(controller, noPartialRequest, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+
+ assertEquals("sum should be 15", 15L, (long)
LONG_COLUMN_INTERPRETER.getPromotedValueFromProto(
+ getParsedGenericInstance(LONG_COLUMN_INTERPRETER.getClass(), 3,
response.getFirstPart(0))));
+ assertFalse("Response should indicate there are more rows",
response.hasNextChunkStartRow());
+ }
+
+}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
index 4bebf5d927c..def22fc32a0 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java
@@ -58,7 +58,7 @@ import
org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
* RegionScannerImpl is used to combine scanners from multiple Stores (aka
column families).
*/
@InterfaceAudience.Private
-class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
+public class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback {
private static final Logger LOG =
LoggerFactory.getLogger(RegionScannerImpl.class);