ndimiduk commented on code in PR #6167:
URL: https://github.com/apache/hbase/pull/6167#discussion_r1740969491
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -92,6 +95,11 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
private static final Logger LOG =
LoggerFactory.getLogger(RawAsyncTableImpl.class);
+ private static final HashedWheelTimer HWT = new HashedWheelTimer(
Review Comment:
Is there an existing HWT that can be used for this purpose? Maybe something
tied to the Connection?
##########
hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java:
##########
@@ -106,6 +126,31 @@ public synchronized void onComplete() {
finished = true;
future.complete(getFinalResult());
}
+
+ @Override
+ public ServiceCaller<AggregateService, AggregateResponse>
+ getNextCallable(AggregateResponse response, RegionInfo region) {
+ if (!response.hasNextChunkStartRow()) {
+ return null;
+ }
+ return (stub, controller, rpcCallback) -> {
+ AggregateRequest.Builder updatedRequest =
AggregateRequest.newBuilder(originalRequest);
+ updatedRequest.setScan(originalRequest.getScan().toBuilder()
+ .setStartRow(response.getNextChunkStartRow()).build());
+ if (log.isTraceEnabled()) {
+ log.trace("Got incomplete result {} for original scan {}. Sending
new request {}.",
Review Comment:
nit: "sending _next_ request" ?
That is, this is considered to be part of the happy path, it is the case
that an aggregation request can span multiple RPCs, so it's not a "new as in
replacement" request, but a "new as in next in the chain of indeterminant
length" request, right?
##########
hbase-endpoint/src/main/java/org/apache/hadoop/hbase/client/coprocessor/AsyncAggregationClient.java:
##########
@@ -131,32 +175,38 @@ private static byte[] nullToEmpty(byte[] b) {
CompletableFuture<R> future = new CompletableFuture<>();
AggregateRequest req;
try {
- req = validateArgAndGetPB(scan, ci, false);
+ req = validateArgAndGetPB(scan, ci, false, true);
} catch (IOException e) {
future.completeExceptionally(e);
return future;
}
- AbstractAggregationCallback<R> callback = new
AbstractAggregationCallback<R>(future) {
+ AbstractAggregationCallback<R> callback =
+ new AbstractAggregationCallback<>(future, req, AggregateService::getMax)
{
- private R max;
+ private R max;
+ private final Object lock = new Object();
Review Comment:
if multiple threads need to coordinate on retrieving this value, should they
be synchronising on a final result? As in, should `getFinalResult` block on
completion of a future Future instead of simply locking around accesses of
`max` ?
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -822,7 +830,30 @@ private <S, R> void onLocateComplete(Function<RpcChannel,
S> stubMaker,
} else {
callback.onRegionComplete(region, r);
}
- if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
+
+ boolean complete = unfinishedRequest.decrementAndGet() == 0 &&
locateFinished.get();
+
+ if (e == null && r != null) {
+ ServiceCaller<S, R> updatedCallable = callback.getNextCallable(r,
region);
+ if (updatedCallable != null) {
+ // We are launching a new request now, so un-set complete
+ complete = false;
+ long waitIntervalMs = callback.getWaitIntervalMs(r, region);
+ LOG.trace("Coprocessor returned incomplete result. "
+ + "Sleeping for {} millis before making follow-up request.",
waitIntervalMs);
+ if (waitIntervalMs > 0) {
+ HWT.newTimeout(
+ (timeout) -> onLocateComplete(stubMaker, updatedCallable,
callback, endKey,
+ endKeyInclusive, locateFinished, unfinishedRequest, loc,
null),
+ waitIntervalMs, TimeUnit.MILLISECONDS);
+ } else {
+ onLocateComplete(stubMaker, updatedCallable, callback, endKey,
endKeyInclusive,
+ locateFinished, unfinishedRequest, loc, null);
+ }
+ }
Review Comment:
I'm not very familiar with the Netty call-back code. This listener invokes
again the `onLocateComplete` method, which effectively makes this call
recursive? In that case, does it mean that the `if(complete)` body below will
be potentially invoked multiple times as listeners are popped off the stack?
That is, when `updatedCallable != null`, should the end of this if-block exit
early, rather than executing all the way through to checking for completeness?
Oh. Right. You set `complete = false` inside this block, so that even when
control flow gets down there, nothing happens. I think this would be less
confusing if the method exited after calling `onLocateComplete` rather than
letting control continue through the end of the method.
I dunno. Take this comment with a grain of salt.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]