Since the protocol is protobuf, it should be quite simple. We can add a new
field supports_partial to the AggregationRequest proto. Only new clients
would set this to true, and that would trigger the partial results on the
client. We have a similar concept for how we handle supporting
retryImmediately for multi

On Sat, Jul 20, 2024 at 9:06 AM 张铎(Duo Zhang) <palomino...@gmail.com> wrote:

> I do not think it is easy to change the current implementation to be
> 'partial results'.
>
> The current assumption of request/response is 'send a range and the
> agg type'/'return the agg result of the whole range'.
>
> If you want to make it possible to return earlier, I think we need to
> tell the client that
> 1. Whether the agg result is for the whole range
> 2. If not, next time you should start from which row
>
> So at least we need to add something to the response protobuf message,
> and since there is no version info in the request, I do not know how
> to do this without breaking existing clients...
>
> Maybe we can just deprecate the current aggregation service, and
> introduce aggregation v2?
>
> And on the broken logic for onLocateComplete, Bryan, could you please
> provide a UT so I can verify the problem?
>
> Thanks.
>
> Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月17日周三 23:01写道:
> >
> > I forgot to respond to your last question.
> >
> > I'm looking through the implementation of the following classes/methods:
> > - RawAsyncTableImpl - onLocateComplete()
> > - CoprocessorServiceBuilderImpl (inner class of above)  - execute()
> > - CoprocessorCallback (inner interface of AsyncTable)
> > - AsyncAggregationClient
> >
> > So we already use future chaining here. When AsyncAggregationClient is
> > invoked, a CompletableFuture is created and stashed into the
> > AbstractAggregationCallback and then
> > CoprocessorServiceBuilderImpl.execute() is called. In execute(), an async
> > region lookup occurs and it uses addListener to call the
> onLocateComplete()
> > callback when that finishes. In onLocateComplete(), it makes an async
> call
> > to the callable from AsyncAggregationClient (i.e. () -> stub.rowCount()).
> > It uses addListener again to handle the response, calling either
> > onRegionComplete or onRegionError and decrementing an unfinishedRequest
> > counter. The AbstractAggregationCallback in AsyncAggregationClient adds
> an
> > implementation for onRegionComplete which passes AggregateResponse to
> > aggregate(). Once unfinishedRequests equals 0, it calls
> > callback.onComplete(). The implementation of callback.onComplete() gets
> the
> > final aggregated results and passes that into future.complete(). At this
> > point the end-caller sees their response finish.
> >
> > So that explains how you can do a series of async calls before finally
> > completing the top-level future. Unfortunately the interfaces involved
> here
> > make adding the specific chaining we want to do a bit more complicated.
> >
> > In an ideal world I think we'd be able to
> > update AbstractAggregationCallback.onRegionComplete() to handle a new
> > AggregateResponse.isPartial() field. In this case, it'd call aggregate as
> > today, but then kick-off a follow-up RPC which would go back through the
> > above future chaining and eventually calling aggregate() again with new
> > data.  This process would continue until isPartial() is false.
> >
> > The problem is, we don't have access to the internals of
> RawAsyncTableImpl
> > in AbstractAggregationCallback or any other way to influence the flow. So
> > the first thing we need to do is figure out a change to the interfaces
> > which would allow a callback to kick-off sub-requests.  This is sort of
> > akin to how we handle retries in the other async callers, which I'm
> > noticing the CoprocessorService stuff does not seem to support at all.
> >
> > The main aspect of the flow that we need to influence is when
> > onLocateComplete() calls callback.onComplete() as this triggers the
> client
> > to return to the end-caller. One way to do it would be if right after
> > calling onRegionComplete() we decrement unfinishedCalls and do something
> > like:
> >
> > ServiceCaller<S, R> updatedCallable = callback.getNextCallable();
> > if (updatedCallable != null) {
> >   // this call is finished, so decrement. but we will kick off a new
> > one below, so don't
> >   // call onComplete
> >   unfinishedRequest.decrementAndGet();
> >   onLocateComplete(stubMaker, updatedCallable, callback, new
> > ArrayList<>(), endKey,
> >     endKeyInclusive, locateFinished, unfinishedRequest, loc, error);
> > } else if (unfinishedRequest.decrementAndGet() == 0 &&
> locateFinished.get()) {
> >   callback.onComplete();
> > }
> >
> > So if there is a next callable, this skips completing the callback and
> > instead recursively kicks back into onLocateComplete with the updated
> > callable for the remaining results. Our AbstractAggregationCallback class
> > would implement getNextCallable(AggregateResponse) with something like
> this:
> >
> > @Override
> > protected ServiceCaller<AggregateService, AggregateResponse>
> getNextCallable(
> >   AggregateResponse response) {
> >   if (!response.isPartial()) {
> >     return null;
> >   }
> >   return (stub, controller, rpcCallback) -> {
> >     AggregateRequest.Builder updatedRequest =
> > AggregateRequest.newBuilder(originalRequest);
> >     // todo: pull whatever field we want from the aggregate response
> > and set it in the next request
> >     //   so that the server can continue where it left off
> >     stub.getMax(controller, updatedRequest.build(), rpcCallback);
> >   };
> > }
> >
> >
> > So with this approach we need to add a getNextCallable() method to the
> > CoprocessorCallback interface. I'm not sure if this is the cleanest way
> to
> > do it, but it's one way that I think would work.
> >
> > This does not cover the backoff portion. For that, we similarly do not
> want
> > to sleep in an async client. Instead we'd want to use a HashedWheelTimer
> to
> > submit the follow-up request async. For example, instead of calling
> > onLocateComplete directly in my first snippet above, you'd call
> > HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, backoffTimeUnit.
> >
> > Since Duo wrote most of this code, I wonder if he has any opinions on the
> > above recommendations or a different approach. Also, I think
> > onLocateComplete is sort of broken for coprocessor calls which span more
> > than 2 regions. It looks like we only handle executing against the first
> > region and last region, but none of the middle ones. This should probably
> > be filed as a bug.
> >
> > On Tue, Jul 16, 2024 at 6:57 PM Charles Connell
> > <cconn...@hubspot.com.invalid> wrote:
> >
> > > Hi folks,
> > >
> > > I am considering contributing a PR that will allow the
> > > AggregationClient+AggregateImplementation coprocessor to respect quota
> > > throttling during its operations, or something in that spirit. I want
> > > to gather input from the community on your thoughts about this. In
> > > particular here are some questions:
> > >
> > > 1. Is this idea useful enough to be accepted as a contribution?
> > > 2. Would you prefer a change to the existing coprocessor, or a new
> > > coprocessor?
> > > 3. One way to accomplish this is for the coprocessor endpoint to
> > > return a partial result to the client each time it encounters an
> > > RpcThrottleException. The client would then sleep and send a new
> > > request, repeatedly, until the partial results cumulatively
> > > represented the full scan it wanted. Another way to accomplish this is
> > > for the coprocessor to sleep on the server side each time it
> > > encounters an RpcThrottleException, and return a single complete
> > > result. There are pros and cons to each of these approaches. I don't
> > > believe there is prior art for either partial results or
> > > server-side-sleeps in HBase, so both ideas feel awkward to me. What do
> > > you think?
> > > 4. If we go with the "partial results" idea, this seems hard to
> > > accomplish in an async client. Does anyone have an example of how to
> > > cleanly encapsulate a multi-request-response cycle of async requests
> > > inside a CompletableFuture? I wonder if this is maybe not possible.
> > >
> > > Thanks for your time.
> > > Charles
> > >
> > >
>

Reply via email to