This is an automated email from the ASF dual-hosted git repository.
cconnell pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new ef29573fb54 HBASE-29385: Improve performance of
AggregrateImplementation quota checks (#7083)
ef29573fb54 is described below
commit ef29573fb54f2131e64b034e361da08876e49b39
Author: Charles Connell <[email protected]>
AuthorDate: Wed Jun 18 08:17:56 2025 -0400
HBASE-29385: Improve performance of AggregrateImplementation quota checks
(#7083)
Signed-off-by: Ray Mattingly <[email protected]>
---
.../hbase/coprocessor/AggregateImplementation.java | 92 ++++++++++++++--------
.../coprocessor/TestAggregateImplementation.java | 23 +++++-
2 files changed, 83 insertions(+), 32 deletions(-)
diff --git
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
index 0a107068a43..389e6c013ba 100644
---
a/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
+++
b/hbase-endpoint/src/main/java/org/apache/hadoop/hbase/coprocessor/AggregateImplementation.java
@@ -29,14 +29,15 @@ import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import java.util.function.Function;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.ClientUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
@@ -82,13 +83,14 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
T max = null;
boolean hasMoreRows = true;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
byte[] colFamily = scan.getFamilies()[0];
@@ -113,6 +115,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
} while (hasMoreRows);
response = singlePartResponse(request, hasMoreRows,
partialResultContext, max,
ci::getProtoForCellType);
+ if (log.isDebugEnabled()) {
+ log.debug("Maximum from this region is {}: {} (partial result: {})
(client {})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), max,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -121,9 +128,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Maximum from this region is {}: {} (partial result: {}) (client
{})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), max,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -138,13 +142,14 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
T min = null;
boolean hasMoreRows = true;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<>();
byte[] colFamily = scan.getFamilies()[0];
@@ -168,6 +173,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
} while (hasMoreRows);
response = singlePartResponse(request, hasMoreRows,
partialResultContext, min,
ci::getProtoForCellType);
+ if (log.isDebugEnabled()) {
+ log.debug("Minimum from this region is {}: {} (partial result: {})
(client {})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), min,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -176,9 +186,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Minimum from this region is {}: {} (partial result: {}) (client
{})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), min,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -193,7 +200,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
long sum = 0L;
boolean hasMoreRows = true;
try {
@@ -201,6 +208,7 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
S sumVal = null;
T temp;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -226,6 +234,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
} while (hasMoreRows);
response = singlePartResponse(request, hasMoreRows,
partialResultContext, sumVal,
ci::getProtoForPromotedType);
+ if (log.isDebugEnabled()) {
+ log.debug("Sum from this region is {}: {} (partial result: {}) (client
{})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), sum,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -234,9 +247,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Sum from this region is {}: {} (partial result: {}) (client
{})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), sum,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -251,10 +261,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
long counter = 0L;
List<Cell> results = new ArrayList<>();
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
boolean hasMoreRows = true;
try {
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
byte[][] colFamilies = scan.getFamilies();
byte[] colFamily = colFamilies != null ? colFamilies[0] : null;
NavigableSet<byte[]> qualifiers =
@@ -282,6 +293,11 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
bb.rewind();
response = responseBuilder(request, hasMoreRows, partialResultContext)
.addFirstPart(ByteString.copyFrom(bb)).build();
+ if (log.isDebugEnabled()) {
+ log.debug("Row counter from this region is {}: {} (partial result: {})
(client {})",
+ env.getRegion().getRegionInfo().getRegionNameAsString(), counter,
hasMoreRows,
+ RpcServer.getRequestUser());
+ }
} catch (IOException e) {
CoprocessorRpcUtils.setControllerException(controller, e);
} finally {
@@ -290,9 +306,6 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
closeQuota(partialResultContext);
}
- log.debug("Row counter from this region is {}: {} (partial result: {})
(client {})",
- env.getRegion().getRegionInfo().getRegionNameAsString(), counter,
hasMoreRows,
- RpcServer.getRequestUser());
done.run(response);
}
@@ -311,12 +324,13 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
S sumVal = null;
Long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -386,12 +400,13 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
InternalScanner scanner = null;
AggregateResponse response = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
S sumVal = null, sumSqVal = null, tempVal = null;
long rowCountVal = 0L;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -467,11 +482,12 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
RpcCallback<AggregateResponse> done) {
AggregateResponse response = null;
InternalScanner scanner = null;
- PartialResultContext partialResultContext = new PartialResultContext();
+ PartialResultContext partialResultContext = null;
try {
ColumnInterpreter<T, S, P, Q, R> ci =
constructColumnInterpreterFromRequest(request);
S sumVal = null, sumWeights = null, tempVal = null, tempWeight = null;
Scan scan = ProtobufUtil.toScan(request.getScan());
+ partialResultContext = newPartialResultContext(scan);
scanner = env.getRegion().getScanner(scan);
byte[] colFamily = scan.getFamilies()[0];
NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(colFamily);
@@ -537,18 +553,34 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
private final static class PartialResultContext {
+ private long rowsRead = 0;
+ private final int quotaCheckInterval;
private OperationQuota quota = null;
private long waitIntervalMs = 0;
- private byte[] lastRowSuccessfullyProcessedArray = null;
- private int lastRowSuccessfullyProcessedOffset = 0;
- private int lastRowSuccessfullyProcessedLength = 0;
+ private Cell lastRowSuccessfullyProcessed = null;
private long previousReadConsumed = 0;
private long previousReadConsumedDifference = 0;
+
+ private PartialResultContext(int quotaCheckInterval) {
+ this.quotaCheckInterval = quotaCheckInterval;
+ }
+ }
+
+ private PartialResultContext newPartialResultContext(Scan scan) {
+ if (scan.getCaching() > 0) {
+ // If the scan has caching set, we will use that as the quota check
interval.
+ return new PartialResultContext(scan.getCaching());
+ } else {
+ return new PartialResultContext(
+ env.getConfiguration().getInt(HConstants.HBASE_CLIENT_SCANNER_CACHING,
1000));
+ }
}
private boolean shouldBreakForThrottling(AggregateRequest request, Scan scan,
PartialResultContext context) throws IOException {
- if (request.getClientSupportsPartialResult()) {
+ if (
+ request.getClientSupportsPartialResult() && context.rowsRead %
context.quotaCheckInterval == 0
+ ) {
long maxBlockBytesScanned;
if (context.quota == null) {
maxBlockBytesScanned = Long.MAX_VALUE;
@@ -571,14 +603,13 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
}
private void postScanPartialResultUpdate(List<Cell> results,
PartialResultContext context) {
+ context.rowsRead += 1;
if (context.quota != null) {
context.quota.addScanResultCells(results);
}
if (!results.isEmpty()) {
Cell result = results.get(results.size() - 1);
- context.lastRowSuccessfullyProcessedArray = result.getRowArray();
- context.lastRowSuccessfullyProcessedOffset = result.getRowOffset();
- context.lastRowSuccessfullyProcessedLength = result.getRowLength();
+ context.lastRowSuccessfullyProcessed = result;
}
}
@@ -605,10 +636,9 @@ public class AggregateImplementation<T, S, P extends
Message, Q extends Message,
PartialResultContext context) {
AggregateResponse.Builder builder = AggregateResponse.newBuilder();
if (request.getClientSupportsPartialResult() && hasMoreRows) {
- if (context.lastRowSuccessfullyProcessedArray != null) {
- byte[] lastRowSuccessfullyProcessed = Arrays.copyOfRange(
- context.lastRowSuccessfullyProcessedArray,
context.lastRowSuccessfullyProcessedOffset,
- context.lastRowSuccessfullyProcessedOffset +
context.lastRowSuccessfullyProcessedLength);
+ if (context.lastRowSuccessfullyProcessed != null) {
+ byte[] lastRowSuccessfullyProcessed =
+ CellUtil.cloneRow(context.lastRowSuccessfullyProcessed);
builder.setNextChunkStartRow(ByteString.copyFrom(
ClientUtil.calculateTheClosestNextRowKeyForPrefix(lastRowSuccessfullyProcessed)));
} else {
diff --git
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
index fc1063fcd9c..6e83a3f83bf 100644
---
a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
+++
b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAggregateImplementation.java
@@ -97,7 +97,7 @@ public class TestAggregateImplementation {
when(region.getRegionInfo()).thenReturn(regionInfo);
when(regionInfo.getRegionNameAsString()).thenReturn("testRegion");
- scan = new Scan().addColumn(CF, CQ);
+ scan = new Scan().addColumn(CF, CQ).setCaching(1);
scanner = mock(RegionScannerImpl.class);
doAnswer(createMockScanner()).when(scanner).next(any(List.class));
@@ -528,6 +528,27 @@ public class TestAggregateImplementation {
response2.hasNextChunkStartRow());
}
+ @Test
+ public void testRowNumWithScannerCaching() throws Exception {
+ ArgumentCaptor<AggregateResponse> responseCaptor =
+ ArgumentCaptor.forClass(AggregateResponse.class);
+ RpcCallback<AggregateResponse> callback = mock(RpcCallback.class);
+
+ // set caching such that throttles are not triggered
+ scan = new Scan().addColumn(CF, CQ).setCaching(5);
+ request = AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))
+ .setInterpreterClassName(LongColumnInterpreter.class.getName())
+ .setClientSupportsPartialResult(true).build();
+ aggregate.getRowNum(controller, request, callback);
+
+ verify(callback).run(responseCaptor.capture());
+
+ AggregateResponse response = responseCaptor.getValue();
+ assertFalse("Response should not indicate there are more rows",
+ response.hasNextChunkStartRow());
+ assertEquals(NUM_ROWS,
response.getFirstPart(0).asReadOnlyByteBuffer().getLong());
+ }
+
@Test
public void testRowNumThrottleWithNoResults() throws Exception {
AggregateRequest request =
AggregateRequest.newBuilder().setScan(ProtobufUtil.toScan(scan))