Pull requests are available that complete this feature:
https://github.com/apache/hbase/pull/6168,
https://github.com/apache/hbase/pull/6167. I would appreciate reviews
from anyone interested.

On Tue, Jul 23, 2024 at 9:16 AM Charles Connell <cconn...@hubspot.com> wrote:
>
> Thank you for the feedback. I will work on the approach that Bryan
> suggests. I will need
> https://issues.apache.org/jira/browse/HBASE-28346 merged before I can
> put up a PR.
>
> On Sat, Jul 20, 2024 at 11:34 AM 张铎(Duo Zhang) <palomino...@gmail.com> wrote:
> >
> > Ah, you are right, we can add a flag to let the new client set it to a
> > non default value.
> >
> > In this way I prefer we implement the 'partial result' logic. Sleeping
> > at server side is not a good idea.
> >
> > Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月20日周六 23:09写道:
> > >
> > > Since the protocol is protobuf, it should be quite simple. We can add a 
> > > new
> > > field supports_partial to the AggregationRequest proto. Only new clients
> > > would set this to true, and that would trigger the partial results on the
> > > client. We have a similar concept for how we handle supporting
> > > retryImmediately for multi
> > >
> > > On Sat, Jul 20, 2024 at 9:06 AM 张铎(Duo Zhang) <palomino...@gmail.com> 
> > > wrote:
> > >
> > > > I do not think it is easy to change the current implementation to be
> > > > 'partial results'.
> > > >
> > > > The current assumption of request/response is 'send a range and the
> > > > agg type'/'return the agg result of the whole range'.
> > > >
> > > > If you want to make it possible to return earlier, I think we need to
> > > > tell the client that
> > > > 1. Whether the agg result is for the whole range
> > > > 2. If not, next time you should start from which row
> > > >
> > > > So at least we need to add something to the response protobuf message,
> > > > and since there is no version info in the request, I do not know how
> > > > to do this without breaking existing clients...
> > > >
> > > > Maybe we can just deprecate the current aggregation service, and
> > > > introduce aggregation v2?
> > > >
> > > > And on the broken logic for onLocateComplete, Bryan, could you please
> > > > provide a UT so I can verify the problem?
> > > >
> > > > Thanks.
> > > >
> > > > Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月17日周三 23:01写道:
> > > > >
> > > > > I forgot to respond to your last question.
> > > > >
> > > > > I'm looking through the implementation of the following 
> > > > > classes/methods:
> > > > > - RawAsyncTableImpl - onLocateComplete()
> > > > > - CoprocessorServiceBuilderImpl (inner class of above)  - execute()
> > > > > - CoprocessorCallback (inner interface of AsyncTable)
> > > > > - AsyncAggregationClient
> > > > >
> > > > > So we already use future chaining here. When AsyncAggregationClient is
> > > > > invoked, a CompletableFuture is created and stashed into the
> > > > > AbstractAggregationCallback and then
> > > > > CoprocessorServiceBuilderImpl.execute() is called. In execute(), an 
> > > > > async
> > > > > region lookup occurs and it uses addListener to call the
> > > > onLocateComplete()
> > > > > callback when that finishes. In onLocateComplete(), it makes an async
> > > > call
> > > > > to the callable from AsyncAggregationClient (i.e. () -> 
> > > > > stub.rowCount()).
> > > > > It uses addListener again to handle the response, calling either
> > > > > onRegionComplete or onRegionError and decrementing an 
> > > > > unfinishedRequest
> > > > > counter. The AbstractAggregationCallback in AsyncAggregationClient 
> > > > > adds
> > > > an
> > > > > implementation for onRegionComplete which passes AggregateResponse to
> > > > > aggregate(). Once unfinishedRequests equals 0, it calls
> > > > > callback.onComplete(). The implementation of callback.onComplete() 
> > > > > gets
> > > > the
> > > > > final aggregated results and passes that into future.complete(). At 
> > > > > this
> > > > > point the end-caller sees their response finish.
> > > > >
> > > > > So that explains how you can do a series of async calls before finally
> > > > > completing the top-level future. Unfortunately the interfaces involved
> > > > here
> > > > > make adding the specific chaining we want to do a bit more 
> > > > > complicated.
> > > > >
> > > > > In an ideal world I think we'd be able to
> > > > > update AbstractAggregationCallback.onRegionComplete() to handle a new
> > > > > AggregateResponse.isPartial() field. In this case, it'd call 
> > > > > aggregate as
> > > > > today, but then kick-off a follow-up RPC which would go back through 
> > > > > the
> > > > > above future chaining and eventually calling aggregate() again with 
> > > > > new
> > > > > data.  This process would continue until isPartial() is false.
> > > > >
> > > > > The problem is, we don't have access to the internals of
> > > > RawAsyncTableImpl
> > > > > in AbstractAggregationCallback or any other way to influence the 
> > > > > flow. So
> > > > > the first thing we need to do is figure out a change to the interfaces
> > > > > which would allow a callback to kick-off sub-requests.  This is sort 
> > > > > of
> > > > > akin to how we handle retries in the other async callers, which I'm
> > > > > noticing the CoprocessorService stuff does not seem to support at all.
> > > > >
> > > > > The main aspect of the flow that we need to influence is when
> > > > > onLocateComplete() calls callback.onComplete() as this triggers the
> > > > client
> > > > > to return to the end-caller. One way to do it would be if right after
> > > > > calling onRegionComplete() we decrement unfinishedCalls and do 
> > > > > something
> > > > > like:
> > > > >
> > > > > ServiceCaller<S, R> updatedCallable = callback.getNextCallable();
> > > > > if (updatedCallable != null) {
> > > > >   // this call is finished, so decrement. but we will kick off a new
> > > > > one below, so don't
> > > > >   // call onComplete
> > > > >   unfinishedRequest.decrementAndGet();
> > > > >   onLocateComplete(stubMaker, updatedCallable, callback, new
> > > > > ArrayList<>(), endKey,
> > > > >     endKeyInclusive, locateFinished, unfinishedRequest, loc, error);
> > > > > } else if (unfinishedRequest.decrementAndGet() == 0 &&
> > > > locateFinished.get()) {
> > > > >   callback.onComplete();
> > > > > }
> > > > >
> > > > > So if there is a next callable, this skips completing the callback and
> > > > > instead recursively kicks back into onLocateComplete with the updated
> > > > > callable for the remaining results. Our AbstractAggregationCallback 
> > > > > class
> > > > > would implement getNextCallable(AggregateResponse) with something like
> > > > this:
> > > > >
> > > > > @Override
> > > > > protected ServiceCaller<AggregateService, AggregateResponse>
> > > > getNextCallable(
> > > > >   AggregateResponse response) {
> > > > >   if (!response.isPartial()) {
> > > > >     return null;
> > > > >   }
> > > > >   return (stub, controller, rpcCallback) -> {
> > > > >     AggregateRequest.Builder updatedRequest =
> > > > > AggregateRequest.newBuilder(originalRequest);
> > > > >     // todo: pull whatever field we want from the aggregate response
> > > > > and set it in the next request
> > > > >     //   so that the server can continue where it left off
> > > > >     stub.getMax(controller, updatedRequest.build(), rpcCallback);
> > > > >   };
> > > > > }
> > > > >
> > > > >
> > > > > So with this approach we need to add a getNextCallable() method to the
> > > > > CoprocessorCallback interface. I'm not sure if this is the cleanest 
> > > > > way
> > > > to
> > > > > do it, but it's one way that I think would work.
> > > > >
> > > > > This does not cover the backoff portion. For that, we similarly do not
> > > > want
> > > > > to sleep in an async client. Instead we'd want to use a 
> > > > > HashedWheelTimer
> > > > to
> > > > > submit the follow-up request async. For example, instead of calling
> > > > > onLocateComplete directly in my first snippet above, you'd call
> > > > > HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, 
> > > > > backoffTimeUnit.
> > > > >
> > > > > Since Duo wrote most of this code, I wonder if he has any opinions on 
> > > > > the
> > > > > above recommendations or a different approach. Also, I think
> > > > > onLocateComplete is sort of broken for coprocessor calls which span 
> > > > > more
> > > > > than 2 regions. It looks like we only handle executing against the 
> > > > > first
> > > > > region and last region, but none of the middle ones. This should 
> > > > > probably
> > > > > be filed as a bug.
> > > > >
> > > > > On Tue, Jul 16, 2024 at 6:57 PM Charles Connell
> > > > > <cconn...@hubspot.com.invalid> wrote:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > I am considering contributing a PR that will allow the
> > > > > > AggregationClient+AggregateImplementation coprocessor to respect 
> > > > > > quota
> > > > > > throttling during its operations, or something in that spirit. I 
> > > > > > want
> > > > > > to gather input from the community on your thoughts about this. In
> > > > > > particular here are some questions:
> > > > > >
> > > > > > 1. Is this idea useful enough to be accepted as a contribution?
> > > > > > 2. Would you prefer a change to the existing coprocessor, or a new
> > > > > > coprocessor?
> > > > > > 3. One way to accomplish this is for the coprocessor endpoint to
> > > > > > return a partial result to the client each time it encounters an
> > > > > > RpcThrottleException. The client would then sleep and send a new
> > > > > > request, repeatedly, until the partial results cumulatively
> > > > > > represented the full scan it wanted. Another way to accomplish this 
> > > > > > is
> > > > > > for the coprocessor to sleep on the server side each time it
> > > > > > encounters an RpcThrottleException, and return a single complete
> > > > > > result. There are pros and cons to each of these approaches. I don't
> > > > > > believe there is prior art for either partial results or
> > > > > > server-side-sleeps in HBase, so both ideas feel awkward to me. What 
> > > > > > do
> > > > > > you think?
> > > > > > 4. If we go with the "partial results" idea, this seems hard to
> > > > > > accomplish in an async client. Does anyone have an example of how to
> > > > > > cleanly encapsulate a multi-request-response cycle of async requests
> > > > > > inside a CompletableFuture? I wonder if this is maybe not possible.
> > > > > >
> > > > > > Thanks for your time.
> > > > > > Charles
> > > > > >
> > > > > >
> > > >
> >

Reply via email to