I forgot to respond to your last question. I'm looking through the implementation of the following classes/methods: - RawAsyncTableImpl - onLocateComplete() - CoprocessorServiceBuilderImpl (inner class of above) - execute() - CoprocessorCallback (inner interface of AsyncTable) - AsyncAggregationClient
So we already use future chaining here. When AsyncAggregationClient is invoked, a CompletableFuture is created and stashed into the AbstractAggregationCallback and then CoprocessorServiceBuilderImpl.execute() is called. In execute(), an async region lookup occurs and it uses addListener to call the onLocateComplete() callback when that finishes. In onLocateComplete(), it makes an async call to the callable from AsyncAggregationClient (i.e. () -> stub.rowCount()). It uses addListener again to handle the response, calling either onRegionComplete or onRegionError and decrementing an unfinishedRequest counter. The AbstractAggregationCallback in AsyncAggregationClient adds an implementation for onRegionComplete which passes AggregateResponse to aggregate(). Once unfinishedRequests equals 0, it calls callback.onComplete(). The implementation of callback.onComplete() gets the final aggregated results and passes that into future.complete(). At this point the end-caller sees their response finish. So that explains how you can do a series of async calls before finally completing the top-level future. Unfortunately the interfaces involved here make adding the specific chaining we want to do a bit more complicated. In an ideal world I think we'd be able to update AbstractAggregationCallback.onRegionComplete() to handle a new AggregateResponse.isPartial() field. In this case, it'd call aggregate as today, but then kick-off a follow-up RPC which would go back through the above future chaining and eventually calling aggregate() again with new data. This process would continue until isPartial() is false. The problem is, we don't have access to the internals of RawAsyncTableImpl in AbstractAggregationCallback or any other way to influence the flow. So the first thing we need to do is figure out a change to the interfaces which would allow a callback to kick-off sub-requests. This is sort of akin to how we handle retries in the other async callers, which I'm noticing the CoprocessorService stuff does not seem to support at all. The main aspect of the flow that we need to influence is when onLocateComplete() calls callback.onComplete() as this triggers the client to return to the end-caller. One way to do it would be if right after calling onRegionComplete() we decrement unfinishedCalls and do something like: ServiceCaller<S, R> updatedCallable = callback.getNextCallable(); if (updatedCallable != null) { // this call is finished, so decrement. but we will kick off a new one below, so don't // call onComplete unfinishedRequest.decrementAndGet(); onLocateComplete(stubMaker, updatedCallable, callback, new ArrayList<>(), endKey, endKeyInclusive, locateFinished, unfinishedRequest, loc, error); } else if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) { callback.onComplete(); } So if there is a next callable, this skips completing the callback and instead recursively kicks back into onLocateComplete with the updated callable for the remaining results. Our AbstractAggregationCallback class would implement getNextCallable(AggregateResponse) with something like this: @Override protected ServiceCaller<AggregateService, AggregateResponse> getNextCallable( AggregateResponse response) { if (!response.isPartial()) { return null; } return (stub, controller, rpcCallback) -> { AggregateRequest.Builder updatedRequest = AggregateRequest.newBuilder(originalRequest); // todo: pull whatever field we want from the aggregate response and set it in the next request // so that the server can continue where it left off stub.getMax(controller, updatedRequest.build(), rpcCallback); }; } So with this approach we need to add a getNextCallable() method to the CoprocessorCallback interface. I'm not sure if this is the cleanest way to do it, but it's one way that I think would work. This does not cover the backoff portion. For that, we similarly do not want to sleep in an async client. Instead we'd want to use a HashedWheelTimer to submit the follow-up request async. For example, instead of calling onLocateComplete directly in my first snippet above, you'd call HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, backoffTimeUnit. Since Duo wrote most of this code, I wonder if he has any opinions on the above recommendations or a different approach. Also, I think onLocateComplete is sort of broken for coprocessor calls which span more than 2 regions. It looks like we only handle executing against the first region and last region, but none of the middle ones. This should probably be filed as a bug. On Tue, Jul 16, 2024 at 6:57 PM Charles Connell <cconn...@hubspot.com.invalid> wrote: > Hi folks, > > I am considering contributing a PR that will allow the > AggregationClient+AggregateImplementation coprocessor to respect quota > throttling during its operations, or something in that spirit. I want > to gather input from the community on your thoughts about this. In > particular here are some questions: > > 1. Is this idea useful enough to be accepted as a contribution? > 2. Would you prefer a change to the existing coprocessor, or a new > coprocessor? > 3. One way to accomplish this is for the coprocessor endpoint to > return a partial result to the client each time it encounters an > RpcThrottleException. The client would then sleep and send a new > request, repeatedly, until the partial results cumulatively > represented the full scan it wanted. Another way to accomplish this is > for the coprocessor to sleep on the server side each time it > encounters an RpcThrottleException, and return a single complete > result. There are pros and cons to each of these approaches. I don't > believe there is prior art for either partial results or > server-side-sleeps in HBase, so both ideas feel awkward to me. What do > you think? > 4. If we go with the "partial results" idea, this seems hard to > accomplish in an async client. Does anyone have an example of how to > cleanly encapsulate a multi-request-response cycle of async requests > inside a CompletableFuture? I wonder if this is maybe not possible. > > Thanks for your time. > Charles > >