Apache9 commented on code in PR #6167:
URL: https://github.com/apache/hbase/pull/6167#discussion_r1752057657


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java:
##########
@@ -744,4 +751,10 @@ default CoprocessorServiceBuilder<S, R> toRow(byte[] 
endKey) {
    */
   <S, R> CoprocessorServiceBuilder<S, R> 
coprocessorService(Function<RpcChannel, S> stubMaker,
     ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);
+
+  /**
+   * Similar to above. Use when your coprocessor client+endpoint supports 
partial results.
+   */
+  <S, R> CoprocessorServiceBuilder<S, R> 
coprocessorService(Function<RpcChannel, S> stubMaker,

Review Comment:
   What if we call this method but the endpoint does not support partial 
results?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java:
##########
@@ -676,6 +676,13 @@ interface CoprocessorCallback<R> {
     void onError(Throwable error);
   }
 
+  @InterfaceAudience.Public
+  interface PartialResultCoprocessorCallback<S, R> extends 
CoprocessorCallback<R> {

Review Comment:
   Could we add some javadoc about how to make use of this interface, like the 
CoprocessorCallback above? This is an IA.Public interface...



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java:
##########
@@ -676,6 +676,13 @@ interface CoprocessorCallback<R> {
     void onError(Throwable error);
   }
 
+  @InterfaceAudience.Public
+  interface PartialResultCoprocessorCallback<S, R> extends 
CoprocessorCallback<R> {
+    ServiceCaller<S, R> getNextCallable(R response, RegionInfo region);

Review Comment:
   Better also add javadoc to explain how our users should do in these two 
methods when implementing the callback.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java:
##########
@@ -297,49 +297,69 @@ public <S, R> CompletableFuture<R> 
coprocessorService(Function<RpcChannel, S> st
   public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
     Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
     CoprocessorCallback<R> callback) {
+    return coprocessorService(stubMaker, callable,
+      (PartialResultCoprocessorCallback<S, R>) callback);

Review Comment:
   Is it safe to directly cast here?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -792,9 +792,9 @@ private boolean locateFinished(RegionInfo region, byte[] 
endKey, boolean endKeyI
   }
 
   private <S, R> void onLocateComplete(Function<RpcChannel, S> stubMaker,
-    ServiceCaller<S, R> callable, CoprocessorCallback<R> callback, 
List<HRegionLocation> locs,

Review Comment:
   Seems the locs is unused? Strange. Thanks for cleaning this up.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -822,7 +822,30 @@ private <S, R> void onLocateComplete(Function<RpcChannel, 
S> stubMaker,
         } else {
           callback.onRegionComplete(region, r);
         }
-        if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
+
+        boolean complete = unfinishedRequest.decrementAndGet() == 0 && 
locateFinished.get();

Review Comment:
   The order is a bit strange here...
   
   I think we should first check whether we still need to send a new request to 
the same region? If so, we just do not need to decrement the unfinishedRequest, 
as we will send a new request again.



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java:
##########
@@ -297,49 +297,69 @@ public <S, R> CompletableFuture<R> 
coprocessorService(Function<RpcChannel, S> st
   public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
     Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
     CoprocessorCallback<R> callback) {
+    return coprocessorService(stubMaker, callable,
+      (PartialResultCoprocessorCallback<S, R>) callback);
+  }
+
+  @Override
+  public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
+    Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
+    PartialResultCoprocessorCallback<S, R> callback) {
     final Context context = Context.current();
-    CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {
+    PartialResultCoprocessorCallback<S, R> wrappedCallback =
+      new PartialResultCoprocessorCallback<S, R>() {
+
+        private final Phaser regionCompletesInProgress = new Phaser(1);

Review Comment:
   I think this one will be included in another PR?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -822,7 +822,30 @@ private <S, R> void onLocateComplete(Function<RpcChannel, 
S> stubMaker,
         } else {
           callback.onRegionComplete(region, r);
         }
-        if (unfinishedRequest.decrementAndGet() == 0 && locateFinished.get()) {
+
+        boolean complete = unfinishedRequest.decrementAndGet() == 0 && 
locateFinished.get();
+
+        if (e == null && r != null) {
+          ServiceCaller<S, R> updatedCallable = callback.getNextCallable(r, 
region);
+          if (updatedCallable != null) {
+            // We are launching a new request now, so un-set complete
+            complete = false;
+            long waitIntervalMs = callback.getWaitIntervalMs(r, region);
+            LOG.trace("Coprocessor returned incomplete result. "
+              + "Sleeping for {} millis before making follow-up request.", 
waitIntervalMs);
+            if (waitIntervalMs > 0) {
+              AsyncConnectionImpl.RETRY_TIMER.newTimeout(
+                (timeout) -> onLocateComplete(stubMaker, updatedCallable, 
callback, endKey,
+                  endKeyInclusive, locateFinished, unfinishedRequest, loc, 
null),
+                waitIntervalMs, TimeUnit.MILLISECONDS);
+            } else {
+              onLocateComplete(stubMaker, updatedCallable, callback, endKey, 
endKeyInclusive,

Review Comment:
   I think we'd better do another abstraction on the send request part, since 
here we just send a new request to the same region, we do not need to determine 
whether locate is finished?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to