Since the protocol is protobuf, it should be quite simple. We can add a new field supports_partial to the AggregationRequest proto. Only new clients would set this to true, and that would trigger the partial results on the client. We have a similar concept for how we handle supporting retryImmediately for multi
On Sat, Jul 20, 2024 at 9:06 AM 张铎(Duo Zhang) <palomino...@gmail.com> wrote: > I do not think it is easy to change the current implementation to be > 'partial results'. > > The current assumption of request/response is 'send a range and the > agg type'/'return the agg result of the whole range'. > > If you want to make it possible to return earlier, I think we need to > tell the client that > 1. Whether the agg result is for the whole range > 2. If not, next time you should start from which row > > So at least we need to add something to the response protobuf message, > and since there is no version info in the request, I do not know how > to do this without breaking existing clients... > > Maybe we can just deprecate the current aggregation service, and > introduce aggregation v2? > > And on the broken logic for onLocateComplete, Bryan, could you please > provide a UT so I can verify the problem? > > Thanks. > > Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月17日周三 23:01写道: > > > > I forgot to respond to your last question. > > > > I'm looking through the implementation of the following classes/methods: > > - RawAsyncTableImpl - onLocateComplete() > > - CoprocessorServiceBuilderImpl (inner class of above) - execute() > > - CoprocessorCallback (inner interface of AsyncTable) > > - AsyncAggregationClient > > > > So we already use future chaining here. When AsyncAggregationClient is > > invoked, a CompletableFuture is created and stashed into the > > AbstractAggregationCallback and then > > CoprocessorServiceBuilderImpl.execute() is called. In execute(), an async > > region lookup occurs and it uses addListener to call the > onLocateComplete() > > callback when that finishes. In onLocateComplete(), it makes an async > call > > to the callable from AsyncAggregationClient (i.e. () -> stub.rowCount()). > > It uses addListener again to handle the response, calling either > > onRegionComplete or onRegionError and decrementing an unfinishedRequest > > counter. The AbstractAggregationCallback in AsyncAggregationClient adds > an > > implementation for onRegionComplete which passes AggregateResponse to > > aggregate(). Once unfinishedRequests equals 0, it calls > > callback.onComplete(). The implementation of callback.onComplete() gets > the > > final aggregated results and passes that into future.complete(). At this > > point the end-caller sees their response finish. > > > > So that explains how you can do a series of async calls before finally > > completing the top-level future. Unfortunately the interfaces involved > here > > make adding the specific chaining we want to do a bit more complicated. > > > > In an ideal world I think we'd be able to > > update AbstractAggregationCallback.onRegionComplete() to handle a new > > AggregateResponse.isPartial() field. In this case, it'd call aggregate as > > today, but then kick-off a follow-up RPC which would go back through the > > above future chaining and eventually calling aggregate() again with new > > data. This process would continue until isPartial() is false. > > > > The problem is, we don't have access to the internals of > RawAsyncTableImpl > > in AbstractAggregationCallback or any other way to influence the flow. So > > the first thing we need to do is figure out a change to the interfaces > > which would allow a callback to kick-off sub-requests. This is sort of > > akin to how we handle retries in the other async callers, which I'm > > noticing the CoprocessorService stuff does not seem to support at all. > > > > The main aspect of the flow that we need to influence is when > > onLocateComplete() calls callback.onComplete() as this triggers the > client > > to return to the end-caller. One way to do it would be if right after > > calling onRegionComplete() we decrement unfinishedCalls and do something > > like: > > > > ServiceCaller<S, R> updatedCallable = callback.getNextCallable(); > > if (updatedCallable != null) { > > // this call is finished, so decrement. but we will kick off a new > > one below, so don't > > // call onComplete > > unfinishedRequest.decrementAndGet(); > > onLocateComplete(stubMaker, updatedCallable, callback, new > > ArrayList<>(), endKey, > > endKeyInclusive, locateFinished, unfinishedRequest, loc, error); > > } else if (unfinishedRequest.decrementAndGet() == 0 && > locateFinished.get()) { > > callback.onComplete(); > > } > > > > So if there is a next callable, this skips completing the callback and > > instead recursively kicks back into onLocateComplete with the updated > > callable for the remaining results. Our AbstractAggregationCallback class > > would implement getNextCallable(AggregateResponse) with something like > this: > > > > @Override > > protected ServiceCaller<AggregateService, AggregateResponse> > getNextCallable( > > AggregateResponse response) { > > if (!response.isPartial()) { > > return null; > > } > > return (stub, controller, rpcCallback) -> { > > AggregateRequest.Builder updatedRequest = > > AggregateRequest.newBuilder(originalRequest); > > // todo: pull whatever field we want from the aggregate response > > and set it in the next request > > // so that the server can continue where it left off > > stub.getMax(controller, updatedRequest.build(), rpcCallback); > > }; > > } > > > > > > So with this approach we need to add a getNextCallable() method to the > > CoprocessorCallback interface. I'm not sure if this is the cleanest way > to > > do it, but it's one way that I think would work. > > > > This does not cover the backoff portion. For that, we similarly do not > want > > to sleep in an async client. Instead we'd want to use a HashedWheelTimer > to > > submit the follow-up request async. For example, instead of calling > > onLocateComplete directly in my first snippet above, you'd call > > HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, backoffTimeUnit. > > > > Since Duo wrote most of this code, I wonder if he has any opinions on the > > above recommendations or a different approach. Also, I think > > onLocateComplete is sort of broken for coprocessor calls which span more > > than 2 regions. It looks like we only handle executing against the first > > region and last region, but none of the middle ones. This should probably > > be filed as a bug. > > > > On Tue, Jul 16, 2024 at 6:57 PM Charles Connell > > <cconn...@hubspot.com.invalid> wrote: > > > > > Hi folks, > > > > > > I am considering contributing a PR that will allow the > > > AggregationClient+AggregateImplementation coprocessor to respect quota > > > throttling during its operations, or something in that spirit. I want > > > to gather input from the community on your thoughts about this. In > > > particular here are some questions: > > > > > > 1. Is this idea useful enough to be accepted as a contribution? > > > 2. Would you prefer a change to the existing coprocessor, or a new > > > coprocessor? > > > 3. One way to accomplish this is for the coprocessor endpoint to > > > return a partial result to the client each time it encounters an > > > RpcThrottleException. The client would then sleep and send a new > > > request, repeatedly, until the partial results cumulatively > > > represented the full scan it wanted. Another way to accomplish this is > > > for the coprocessor to sleep on the server side each time it > > > encounters an RpcThrottleException, and return a single complete > > > result. There are pros and cons to each of these approaches. I don't > > > believe there is prior art for either partial results or > > > server-side-sleeps in HBase, so both ideas feel awkward to me. What do > > > you think? > > > 4. If we go with the "partial results" idea, this seems hard to > > > accomplish in an async client. Does anyone have an example of how to > > > cleanly encapsulate a multi-request-response cycle of async requests > > > inside a CompletableFuture? I wonder if this is maybe not possible. > > > > > > Thanks for your time. > > > Charles > > > > > > >