I forgot to respond to your last question.

I'm looking through the implementation of the following classes/methods:
- RawAsyncTableImpl - onLocateComplete()
- CoprocessorServiceBuilderImpl (inner class of above)  - execute()
- CoprocessorCallback (inner interface of AsyncTable)
- AsyncAggregationClient

So we already use future chaining here. When AsyncAggregationClient is
invoked, a CompletableFuture is created and stashed into the
AbstractAggregationCallback and then
CoprocessorServiceBuilderImpl.execute() is called. In execute(), an async
region lookup occurs and it uses addListener to call the onLocateComplete()
callback when that finishes. In onLocateComplete(), it makes an async call
to the callable from AsyncAggregationClient (i.e. () -> stub.rowCount()).
It uses addListener again to handle the response, calling either
onRegionComplete or onRegionError and decrementing an unfinishedRequest
counter. The AbstractAggregationCallback in AsyncAggregationClient adds an
implementation for onRegionComplete which passes AggregateResponse to
aggregate(). Once unfinishedRequests equals 0, it calls
callback.onComplete(). The implementation of callback.onComplete() gets the
final aggregated results and passes that into future.complete(). At this
point the end-caller sees their response finish.

So that explains how you can do a series of async calls before finally
completing the top-level future. Unfortunately the interfaces involved here
make adding the specific chaining we want to do a bit more complicated.

In an ideal world I think we'd be able to
update AbstractAggregationCallback.onRegionComplete() to handle a new
AggregateResponse.isPartial() field. In this case, it'd call aggregate as
today, but then kick-off a follow-up RPC which would go back through the
above future chaining and eventually calling aggregate() again with new
data.  This process would continue until isPartial() is false.

The problem is, we don't have access to the internals of RawAsyncTableImpl
in AbstractAggregationCallback or any other way to influence the flow. So
the first thing we need to do is figure out a change to the interfaces
which would allow a callback to kick-off sub-requests.  This is sort of
akin to how we handle retries in the other async callers, which I'm
noticing the CoprocessorService stuff does not seem to support at all.

The main aspect of the flow that we need to influence is when
onLocateComplete() calls callback.onComplete() as this triggers the client
to return to the end-caller. One way to do it would be if right after
calling onRegionComplete() we decrement unfinishedCalls and do something
like:

ServiceCaller<S, R> updatedCallable = callback.getNextCallable();
if (updatedCallable != null) {
  // this call is finished, so decrement. but we will kick off a new
one below, so don't
  // call onComplete
  unfinishedRequest.decrementAndGet();
  onLocateComplete(stubMaker, updatedCallable, callback, new
ArrayList<>(), endKey,
    endKeyInclusive, locateFinished, unfinishedRequest, loc, error);
} else if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
  callback.onComplete();
}

So if there is a next callable, this skips completing the callback and
instead recursively kicks back into onLocateComplete with the updated
callable for the remaining results. Our AbstractAggregationCallback class
would implement getNextCallable(AggregateResponse) with something like this:

@Override
protected ServiceCaller<AggregateService, AggregateResponse> getNextCallable(
  AggregateResponse response) {
  if (!response.isPartial()) {
    return null;
  }
  return (stub, controller, rpcCallback) -> {
    AggregateRequest.Builder updatedRequest =
AggregateRequest.newBuilder(originalRequest);
    // todo: pull whatever field we want from the aggregate response
and set it in the next request
    //   so that the server can continue where it left off
    stub.getMax(controller, updatedRequest.build(), rpcCallback);
  };
}


So with this approach we need to add a getNextCallable() method to the
CoprocessorCallback interface. I'm not sure if this is the cleanest way to
do it, but it's one way that I think would work.

This does not cover the backoff portion. For that, we similarly do not want
to sleep in an async client. Instead we'd want to use a HashedWheelTimer to
submit the follow-up request async. For example, instead of calling
onLocateComplete directly in my first snippet above, you'd call
HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, backoffTimeUnit.

Since Duo wrote most of this code, I wonder if he has any opinions on the
above recommendations or a different approach. Also, I think
onLocateComplete is sort of broken for coprocessor calls which span more
than 2 regions. It looks like we only handle executing against the first
region and last region, but none of the middle ones. This should probably
be filed as a bug.

On Tue, Jul 16, 2024 at 6:57 PM Charles Connell
<cconn...@hubspot.com.invalid> wrote:

> Hi folks,
>
> I am considering contributing a PR that will allow the
> AggregationClient+AggregateImplementation coprocessor to respect quota
> throttling during its operations, or something in that spirit. I want
> to gather input from the community on your thoughts about this. In
> particular here are some questions:
>
> 1. Is this idea useful enough to be accepted as a contribution?
> 2. Would you prefer a change to the existing coprocessor, or a new
> coprocessor?
> 3. One way to accomplish this is for the coprocessor endpoint to
> return a partial result to the client each time it encounters an
> RpcThrottleException. The client would then sleep and send a new
> request, repeatedly, until the partial results cumulatively
> represented the full scan it wanted. Another way to accomplish this is
> for the coprocessor to sleep on the server side each time it
> encounters an RpcThrottleException, and return a single complete
> result. There are pros and cons to each of these approaches. I don't
> believe there is prior art for either partial results or
> server-side-sleeps in HBase, so both ideas feel awkward to me. What do
> you think?
> 4. If we go with the "partial results" idea, this seems hard to
> accomplish in an async client. Does anyone have an example of how to
> cleanly encapsulate a multi-request-response cycle of async requests
> inside a CompletableFuture? I wonder if this is maybe not possible.
>
> Thanks for your time.
> Charles
>
>

Reply via email to