Pull requests are available that complete this feature: https://github.com/apache/hbase/pull/6168, https://github.com/apache/hbase/pull/6167. I would appreciate reviews from anyone interested.
On Tue, Jul 23, 2024 at 9:16 AM Charles Connell <cconn...@hubspot.com> wrote: > > Thank you for the feedback. I will work on the approach that Bryan > suggests. I will need > https://issues.apache.org/jira/browse/HBASE-28346 merged before I can > put up a PR. > > On Sat, Jul 20, 2024 at 11:34 AM 张铎(Duo Zhang) <palomino...@gmail.com> wrote: > > > > Ah, you are right, we can add a flag to let the new client set it to a > > non default value. > > > > In this way I prefer we implement the 'partial result' logic. Sleeping > > at server side is not a good idea. > > > > Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月20日周六 23:09写道: > > > > > > Since the protocol is protobuf, it should be quite simple. We can add a > > > new > > > field supports_partial to the AggregationRequest proto. Only new clients > > > would set this to true, and that would trigger the partial results on the > > > client. We have a similar concept for how we handle supporting > > > retryImmediately for multi > > > > > > On Sat, Jul 20, 2024 at 9:06 AM 张铎(Duo Zhang) <palomino...@gmail.com> > > > wrote: > > > > > > > I do not think it is easy to change the current implementation to be > > > > 'partial results'. > > > > > > > > The current assumption of request/response is 'send a range and the > > > > agg type'/'return the agg result of the whole range'. > > > > > > > > If you want to make it possible to return earlier, I think we need to > > > > tell the client that > > > > 1. Whether the agg result is for the whole range > > > > 2. If not, next time you should start from which row > > > > > > > > So at least we need to add something to the response protobuf message, > > > > and since there is no version info in the request, I do not know how > > > > to do this without breaking existing clients... > > > > > > > > Maybe we can just deprecate the current aggregation service, and > > > > introduce aggregation v2? > > > > > > > > And on the broken logic for onLocateComplete, Bryan, could you please > > > > provide a UT so I can verify the problem? > > > > > > > > Thanks. > > > > > > > > Bryan Beaudreault <bbeaudrea...@apache.org> 于2024年7月17日周三 23:01写道: > > > > > > > > > > I forgot to respond to your last question. > > > > > > > > > > I'm looking through the implementation of the following > > > > > classes/methods: > > > > > - RawAsyncTableImpl - onLocateComplete() > > > > > - CoprocessorServiceBuilderImpl (inner class of above) - execute() > > > > > - CoprocessorCallback (inner interface of AsyncTable) > > > > > - AsyncAggregationClient > > > > > > > > > > So we already use future chaining here. When AsyncAggregationClient is > > > > > invoked, a CompletableFuture is created and stashed into the > > > > > AbstractAggregationCallback and then > > > > > CoprocessorServiceBuilderImpl.execute() is called. In execute(), an > > > > > async > > > > > region lookup occurs and it uses addListener to call the > > > > onLocateComplete() > > > > > callback when that finishes. In onLocateComplete(), it makes an async > > > > call > > > > > to the callable from AsyncAggregationClient (i.e. () -> > > > > > stub.rowCount()). > > > > > It uses addListener again to handle the response, calling either > > > > > onRegionComplete or onRegionError and decrementing an > > > > > unfinishedRequest > > > > > counter. The AbstractAggregationCallback in AsyncAggregationClient > > > > > adds > > > > an > > > > > implementation for onRegionComplete which passes AggregateResponse to > > > > > aggregate(). Once unfinishedRequests equals 0, it calls > > > > > callback.onComplete(). The implementation of callback.onComplete() > > > > > gets > > > > the > > > > > final aggregated results and passes that into future.complete(). At > > > > > this > > > > > point the end-caller sees their response finish. > > > > > > > > > > So that explains how you can do a series of async calls before finally > > > > > completing the top-level future. Unfortunately the interfaces involved > > > > here > > > > > make adding the specific chaining we want to do a bit more > > > > > complicated. > > > > > > > > > > In an ideal world I think we'd be able to > > > > > update AbstractAggregationCallback.onRegionComplete() to handle a new > > > > > AggregateResponse.isPartial() field. In this case, it'd call > > > > > aggregate as > > > > > today, but then kick-off a follow-up RPC which would go back through > > > > > the > > > > > above future chaining and eventually calling aggregate() again with > > > > > new > > > > > data. This process would continue until isPartial() is false. > > > > > > > > > > The problem is, we don't have access to the internals of > > > > RawAsyncTableImpl > > > > > in AbstractAggregationCallback or any other way to influence the > > > > > flow. So > > > > > the first thing we need to do is figure out a change to the interfaces > > > > > which would allow a callback to kick-off sub-requests. This is sort > > > > > of > > > > > akin to how we handle retries in the other async callers, which I'm > > > > > noticing the CoprocessorService stuff does not seem to support at all. > > > > > > > > > > The main aspect of the flow that we need to influence is when > > > > > onLocateComplete() calls callback.onComplete() as this triggers the > > > > client > > > > > to return to the end-caller. One way to do it would be if right after > > > > > calling onRegionComplete() we decrement unfinishedCalls and do > > > > > something > > > > > like: > > > > > > > > > > ServiceCaller<S, R> updatedCallable = callback.getNextCallable(); > > > > > if (updatedCallable != null) { > > > > > // this call is finished, so decrement. but we will kick off a new > > > > > one below, so don't > > > > > // call onComplete > > > > > unfinishedRequest.decrementAndGet(); > > > > > onLocateComplete(stubMaker, updatedCallable, callback, new > > > > > ArrayList<>(), endKey, > > > > > endKeyInclusive, locateFinished, unfinishedRequest, loc, error); > > > > > } else if (unfinishedRequest.decrementAndGet() == 0 && > > > > locateFinished.get()) { > > > > > callback.onComplete(); > > > > > } > > > > > > > > > > So if there is a next callable, this skips completing the callback and > > > > > instead recursively kicks back into onLocateComplete with the updated > > > > > callable for the remaining results. Our AbstractAggregationCallback > > > > > class > > > > > would implement getNextCallable(AggregateResponse) with something like > > > > this: > > > > > > > > > > @Override > > > > > protected ServiceCaller<AggregateService, AggregateResponse> > > > > getNextCallable( > > > > > AggregateResponse response) { > > > > > if (!response.isPartial()) { > > > > > return null; > > > > > } > > > > > return (stub, controller, rpcCallback) -> { > > > > > AggregateRequest.Builder updatedRequest = > > > > > AggregateRequest.newBuilder(originalRequest); > > > > > // todo: pull whatever field we want from the aggregate response > > > > > and set it in the next request > > > > > // so that the server can continue where it left off > > > > > stub.getMax(controller, updatedRequest.build(), rpcCallback); > > > > > }; > > > > > } > > > > > > > > > > > > > > > So with this approach we need to add a getNextCallable() method to the > > > > > CoprocessorCallback interface. I'm not sure if this is the cleanest > > > > > way > > > > to > > > > > do it, but it's one way that I think would work. > > > > > > > > > > This does not cover the backoff portion. For that, we similarly do not > > > > want > > > > > to sleep in an async client. Instead we'd want to use a > > > > > HashedWheelTimer > > > > to > > > > > submit the follow-up request async. For example, instead of calling > > > > > onLocateComplete directly in my first snippet above, you'd call > > > > > HWT.newTimeout(t -> onLocateComplete(...)), backoffTime, > > > > > backoffTimeUnit. > > > > > > > > > > Since Duo wrote most of this code, I wonder if he has any opinions on > > > > > the > > > > > above recommendations or a different approach. Also, I think > > > > > onLocateComplete is sort of broken for coprocessor calls which span > > > > > more > > > > > than 2 regions. It looks like we only handle executing against the > > > > > first > > > > > region and last region, but none of the middle ones. This should > > > > > probably > > > > > be filed as a bug. > > > > > > > > > > On Tue, Jul 16, 2024 at 6:57 PM Charles Connell > > > > > <cconn...@hubspot.com.invalid> wrote: > > > > > > > > > > > Hi folks, > > > > > > > > > > > > I am considering contributing a PR that will allow the > > > > > > AggregationClient+AggregateImplementation coprocessor to respect > > > > > > quota > > > > > > throttling during its operations, or something in that spirit. I > > > > > > want > > > > > > to gather input from the community on your thoughts about this. In > > > > > > particular here are some questions: > > > > > > > > > > > > 1. Is this idea useful enough to be accepted as a contribution? > > > > > > 2. Would you prefer a change to the existing coprocessor, or a new > > > > > > coprocessor? > > > > > > 3. One way to accomplish this is for the coprocessor endpoint to > > > > > > return a partial result to the client each time it encounters an > > > > > > RpcThrottleException. The client would then sleep and send a new > > > > > > request, repeatedly, until the partial results cumulatively > > > > > > represented the full scan it wanted. Another way to accomplish this > > > > > > is > > > > > > for the coprocessor to sleep on the server side each time it > > > > > > encounters an RpcThrottleException, and return a single complete > > > > > > result. There are pros and cons to each of these approaches. I don't > > > > > > believe there is prior art for either partial results or > > > > > > server-side-sleeps in HBase, so both ideas feel awkward to me. What > > > > > > do > > > > > > you think? > > > > > > 4. If we go with the "partial results" idea, this seems hard to > > > > > > accomplish in an async client. Does anyone have an example of how to > > > > > > cleanly encapsulate a multi-request-response cycle of async requests > > > > > > inside a CompletableFuture? I wonder if this is maybe not possible. > > > > > > > > > > > > Thanks for your time. > > > > > > Charles > > > > > > > > > > > > > > > > > >