krconv commented on code in PR #7131:
URL: https://github.com/apache/hbase/pull/7131#discussion_r2173539931


##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -944,4 +1001,59 @@ public <S, R> CoprocessorServiceBuilder<S, R> 
coprocessorService(
     PartialResultCoprocessorCallback<S, R> callback) {
     return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
   }
+
+  private static class MultiRegionCoprocessorServiceProgress<R>{

Review Comment:
   The state needed to manage the all the individual RPCs to regions was being 
passed around as parameters, this moves them into this helper class along with 
some additional state



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,52 +797,102 @@ private boolean locateFinished(RegionInfo region, byte[] 
endKey, boolean endKeyI
 
   private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> 
stubMaker,
     ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> 
callback,
-    AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo 
region, Span span) {
+    MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region, Span 
span) {
     addListener(coprocessorService(stubMaker, callable, region, 
region.getStartKey()), (r, e) -> {
       try (Scope ignored = span.makeCurrent()) {
-        if (e != null) {
-          callback.onRegionError(region, e);
-        } else {
-          callback.onRegionComplete(region, r);
-        }
+        if (e instanceof RegionNameChangedException) {
+          RegionInfo newRegion = 
((RegionNameChangedException)e).getNewRegionInfo();
+          if (progress.markNewRegionAndCheckNeedsToBeHandled(newRegion)) {
+            if (progress.hasResponseStarted(region)) {
+              // already started sending responses for the original region, so 
we can't switch to
+              // the new regions because that would violate the contract of 
the callbacks
+              progress.onResponse(region, null, new DoNotRetryIOException(
+                "Region " + region.getEncodedName() + " no longer exists, 
likely due to a split or"
+                  + " merge, and the coprocessor service was already in 
progress and can't be recovered"
+              ));
+            } else {
+              LOG.debug("Attempted to send a coprocessor service RPC to region 
{} which no"
+                  + " longer exists, will attempt to send RPCs to the 
region(s) that replaced it",
+                region.getEncodedName());
+              restartCoprocessorServiceForRange(stubMaker, callable, callback, 
region.getStartKey(),

Review Comment:
   This is where this is submitting a new call for the range covered by the 
region that was split/merged



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java:
##########
@@ -78,8 +83,7 @@ private CompletableFuture<Message> rpcCall(MethodDescriptor 
method, Message requ
     final Context context = Context.current();
     CompletableFuture<Message> future = new CompletableFuture<>();
     if (region != null && !Bytes.equals(loc.getRegion().getRegionName(), 
region.getRegionName())) {
-      future.completeExceptionally(new DoNotRetryIOException("Region name is 
changed, expected "
-        + region.getRegionNameAsString() + ", actual " + 
loc.getRegion().getRegionNameAsString()));
+      future.completeExceptionally(new 
RegionNameChangedException(loc.getRegion()));

Review Comment:
   This works, but I wonder if there is a better place to detect this scenario 
from the client



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,52 +797,102 @@ private boolean locateFinished(RegionInfo region, byte[] 
endKey, boolean endKeyI
 
   private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S> 
stubMaker,
     ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> 
callback,
-    AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo 
region, Span span) {
+    MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region, Span 
span) {
     addListener(coprocessorService(stubMaker, callable, region, 
region.getStartKey()), (r, e) -> {
       try (Scope ignored = span.makeCurrent()) {
-        if (e != null) {
-          callback.onRegionError(region, e);
-        } else {
-          callback.onRegionComplete(region, r);
-        }
+        if (e instanceof RegionNameChangedException) {
+          RegionInfo newRegion = 
((RegionNameChangedException)e).getNewRegionInfo();
+          if (progress.markNewRegionAndCheckNeedsToBeHandled(newRegion)) {
+            if (progress.hasResponseStarted(region)) {
+              // already started sending responses for the original region, so 
we can't switch to
+              // the new regions because that would violate the contract of 
the callbacks
+              progress.onResponse(region, null, new DoNotRetryIOException(
+                "Region " + region.getEncodedName() + " no longer exists, 
likely due to a split or"
+                  + " merge, and the coprocessor service was already in 
progress and can't be recovered"
+              ));

Review Comment:
   In this case, the `CoprocessorCallback::onRegionComplete` has already been 
called at least once for this region, and I think it'd be better to throw an 
error in this edge case instead of trying to make the `CoprocessorCallback` 
handle this scenario. In this example scenario, if we had already sent these 
callbacks:
   ```
   callback.onRegionComplete(r1, { sum => 10, next => 0x01 });
   callback.onRegionComplete(r1, { sum => 7, next => 0x02 });
   [r1 is split into r2 and r3]
   ```
   How would we signal to the caller that the results for region r1 are 
actually no longer relevant, and that they should now consider those results 
replaced by r2 and r3? Again, I think keeping the interface as is and 
continuing to throw an error in this edge case would be better, but open to 
other thoughts



##########
hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncTableCoprocessorEndpoint.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.AsyncTable.CoprocessorCallback;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.ServiceCaller;
+import org.apache.hadoop.hbase.client.Table;
+import 
org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
+import 
org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
+import 
org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
+import 
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class, CoprocessorTests.class })
+public class TestAsyncTableCoprocessorEndpoint {

Review Comment:
   We do have other async coprocessor endpoint tests, but the only one I could 
find (`TestAsyncCoprocessorEndpoint`) is for the admin client



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to