I do not think it is easy to change the current implementation to be
'partial results'.

The current assumption of request/response is 'send a range and the
agg type'/'return the agg result of the whole range'.

If you want to make it possible to return earlier, I think we need to
tell the client that
1. Whether the agg result is for the whole range
2. If not, next time you should start from which row

So at least we need to add something to the response protobuf message,
and since there is no version info in the request, I do not know how
to do this without breaking existing clients...

Maybe we can just deprecate the current aggregation service, and
introduce aggregation v2?

And on the broken logic for onLocateComplete, Bryan, could you please
provide a UT so I can verify the problem?

Thanks.

Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月17日周三 23:01写道:
>
> I forgot to respond to your last question.
>
> I'm looking through the implementation of the following classes/methods:
> - RawAsyncTableImpl - onLocateComplete()
> - CoprocessorServiceBuilderImpl (inner class of above)  - execute()
> - CoprocessorCallback (inner interface of AsyncTable)
> - AsyncAggregationClient
>
> So we already use future chaining here. When AsyncAggregationClient is
> invoked, a CompletableFuture is created and stashed into the
> AbstractAggregationCallback and then
> CoprocessorServiceBuilderImpl.execute() is called. In execute(), an async
> region lookup occurs and it uses addListener to call the onLocateComplete()
> callback when that finishes. In onLocateComplete(), it makes an async call
> to the callable from AsyncAggregationClient (i.e. () -> stub.rowCount()).
> It uses addListener again to handle the response, calling either
> onRegionComplete or onRegionError and decrementing an unfinishedRequest
> counter. The AbstractAggregationCallback in AsyncAggregationClient adds an
> implementation for onRegionComplete which passes AggregateResponse to
> aggregate(). Once unfinishedRequests equals 0, it calls
> callback.onComplete(). The implementation of callback.onComplete() gets the
> final aggregated results and passes that into future.complete(). At this
> point the end-caller sees their response finish.
>
> So that explains how you can do a series of async calls before finally
> completing the top-level future. Unfortunately the interfaces involved here
> make adding the specific chaining we want to do a bit more complicated.
>
> In an ideal world I think we'd be able to
> update AbstractAggregationCallback.onRegionComplete() to handle a new
> AggregateResponse.isPartial() field. In this case, it'd call aggregate as
> today, but then kick-off a follow-up RPC which would go back through the
> above future chaining and eventually calling aggregate() again with new
> data.  This process would continue until isPartial() is false.
>
> The problem is, we don't have access to the internals of RawAsyncTableImpl
> in AbstractAggregationCallback or any other way to influence the flow. So
> the first thing we need to do is figure out a change to the interfaces
> which would allow a callback to kick-off sub-requests.  This is sort of
> akin to how we handle retries in the other async callers, which I'm
> noticing the CoprocessorService stuff does not seem to support at all.
>
> The main aspect of the flow that we need to influence is when
> onLocateComplete() calls callback.onComplete() as this triggers the client
> to return to the end-caller. One way to do it would be if right after
> calling onRegionComplete() we decrement unfinishedCalls and do something
> like:
>
> ServiceCaller<S, R> updatedCallable = callback.getNextCallable();
> if (updatedCallable != null) {
>   // this call is finished, so decrement. but we will kick off a new
> one below, so don't
>   // call onComplete
>   unfinishedRequest.decrementAndGet();
>   onLocateComplete(stubMaker, updatedCallable, callback, new
> ArrayList<>(), endKey,
>     endKeyInclusive, locateFinished, unfinishedRequest, loc, error);
> } else if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
>   callback.onComplete();
> }
>
> So if there is a next callable, this skips completing the callback and
> instead recursively kicks back into onLocateComplete with the updated
> callable for the remaining results. Our AbstractAggregationCallback class
> would implement getNextCallable(AggregateResponse) with something like this:
>
> @Override
> protected ServiceCaller<AggregateService, AggregateResponse> getNextCallable(
>   AggregateResponse response) {
>   if (!response.isPartial()) {
>     return null;
>   }
>   return (stub, controller, rpcCallback) -> {
>     AggregateRequest.Builder updatedRequest =
> AggregateRequest.newBuilder(originalRequest);
>     // todo: pull whatever field we want from the aggregate response
> and set it in the next request
>     //   so that the server can continue where it left off
>     stub.getMax(controller, updatedRequest.build(), rpcCallback);
>   };
> }
>
>
> So with this approach we need to add a getNextCallable() method to the
> CoprocessorCallback interface. I'm not sure if this is the cleanest way to
> do it, but it's one way that I think would work.
>
> This does not cover the backoff portion. For that, we similarly do not want
> to sleep in an async client. Instead we'd want to use a HashedWheelTimer to
> submit the follow-up request async. For example, instead of calling
> onLocateComplete directly in my first snippet above, you'd call
> HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, backoffTimeUnit.
>
> Since Duo wrote most of this code, I wonder if he has any opinions on the
> above recommendations or a different approach. Also, I think
> onLocateComplete is sort of broken for coprocessor calls which span more
> than 2 regions. It looks like we only handle executing against the first
> region and last region, but none of the middle ones. This should probably
> be filed as a bug.
>
> On Tue, Jul 16, 2024 at 6:57 PM Charles Connell
> <cconn...@hubspot.com.invalid> wrote:
>
> > Hi folks,
> >
> > I am considering contributing a PR that will allow the
> > AggregationClient+AggregateImplementation coprocessor to respect quota
> > throttling during its operations, or something in that spirit. I want
> > to gather input from the community on your thoughts about this. In
> > particular here are some questions:
> >
> > 1. Is this idea useful enough to be accepted as a contribution?
> > 2. Would you prefer a change to the existing coprocessor, or a new
> > coprocessor?
> > 3. One way to accomplish this is for the coprocessor endpoint to
> > return a partial result to the client each time it encounters an
> > RpcThrottleException. The client would then sleep and send a new
> > request, repeatedly, until the partial results cumulatively
> > represented the full scan it wanted. Another way to accomplish this is
> > for the coprocessor to sleep on the server side each time it
> > encounters an RpcThrottleException, and return a single complete
> > result. There are pros and cons to each of these approaches. I don't
> > believe there is prior art for either partial results or
> > server-side-sleeps in HBase, so both ideas feel awkward to me. What do
> > you think?
> > 4. If we go with the "partial results" idea, this seems hard to
> > accomplish in an async client. Does anyone have an example of how to
> > cleanly encapsulate a multi-request-response cycle of async requests
> > inside a CompletableFuture? I wonder if this is maybe not possible.
> >
> > Thanks for your time.
> > Charles
> >
> >

Reply via email to