krconv commented on code in PR #7131:
URL: https://github.com/apache/hbase/pull/7131#discussion_r2173539931
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -944,4 +1001,59 @@ public <S, R> CoprocessorServiceBuilder<S, R>
coprocessorService(
PartialResultCoprocessorCallback<S, R> callback) {
return new CoprocessorServiceBuilderImpl<>(stubMaker, callable, callback);
}
+
+ private static class MultiRegionCoprocessorServiceProgress<R>{
Review Comment:
The state needed to manage the all the individual RPCs to regions was being
passed around as parameters, this moves them into this helper class along with
some additional state
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,52 +797,102 @@ private boolean locateFinished(RegionInfo region, byte[]
endKey, boolean endKeyI
private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S>
stubMaker,
ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R>
callback,
- AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo
region, Span span) {
+ MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region, Span
span) {
addListener(coprocessorService(stubMaker, callable, region,
region.getStartKey()), (r, e) -> {
try (Scope ignored = span.makeCurrent()) {
- if (e != null) {
- callback.onRegionError(region, e);
- } else {
- callback.onRegionComplete(region, r);
- }
+ if (e instanceof RegionNameChangedException) {
+ RegionInfo newRegion =
((RegionNameChangedException)e).getNewRegionInfo();
+ if (progress.markNewRegionAndCheckNeedsToBeHandled(newRegion)) {
+ if (progress.hasResponseStarted(region)) {
+ // already started sending responses for the original region, so
we can't switch to
+ // the new regions because that would violate the contract of
the callbacks
+ progress.onResponse(region, null, new DoNotRetryIOException(
+ "Region " + region.getEncodedName() + " no longer exists,
likely due to a split or"
+ + " merge, and the coprocessor service was already in
progress and can't be recovered"
+ ));
+ } else {
+ LOG.debug("Attempted to send a coprocessor service RPC to region
{} which no"
+ + " longer exists, will attempt to send RPCs to the
region(s) that replaced it",
+ region.getEncodedName());
+ restartCoprocessorServiceForRange(stubMaker, callable, callback,
region.getStartKey(),
Review Comment:
This is where this is submitting a new call for the range covered by the
region that was split/merged
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannelImpl.java:
##########
@@ -78,8 +83,7 @@ private CompletableFuture<Message> rpcCall(MethodDescriptor
method, Message requ
final Context context = Context.current();
CompletableFuture<Message> future = new CompletableFuture<>();
if (region != null && !Bytes.equals(loc.getRegion().getRegionName(),
region.getRegionName())) {
- future.completeExceptionally(new DoNotRetryIOException("Region name is
changed, expected "
- + region.getRegionNameAsString() + ", actual " +
loc.getRegion().getRegionNameAsString()));
+ future.completeExceptionally(new
RegionNameChangedException(loc.getRegion()));
Review Comment:
This works, but I wonder if there is a better place to detect this scenario
from the client
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java:
##########
@@ -796,52 +797,102 @@ private boolean locateFinished(RegionInfo region, byte[]
endKey, boolean endKeyI
private <S, R> void coprocessorServiceUntilComplete(Function<RpcChannel, S>
stubMaker,
ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R>
callback,
- AtomicBoolean locateFinished, AtomicInteger unfinishedRequest, RegionInfo
region, Span span) {
+ MultiRegionCoprocessorServiceProgress<R> progress, RegionInfo region, Span
span) {
addListener(coprocessorService(stubMaker, callable, region,
region.getStartKey()), (r, e) -> {
try (Scope ignored = span.makeCurrent()) {
- if (e != null) {
- callback.onRegionError(region, e);
- } else {
- callback.onRegionComplete(region, r);
- }
+ if (e instanceof RegionNameChangedException) {
+ RegionInfo newRegion =
((RegionNameChangedException)e).getNewRegionInfo();
+ if (progress.markNewRegionAndCheckNeedsToBeHandled(newRegion)) {
+ if (progress.hasResponseStarted(region)) {
+ // already started sending responses for the original region, so
we can't switch to
+ // the new regions because that would violate the contract of
the callbacks
+ progress.onResponse(region, null, new DoNotRetryIOException(
+ "Region " + region.getEncodedName() + " no longer exists,
likely due to a split or"
+ + " merge, and the coprocessor service was already in
progress and can't be recovered"
+ ));
Review Comment:
In this case, the `CoprocessorCallback::onRegionComplete` has already been
called at least once for this region, and I think it'd be better to throw an
error in this edge case instead of trying to make the `CoprocessorCallback`
handle this scenario. In this example scenario, if we had already sent these
callbacks:
```
callback.onRegionComplete(r1, { sum => 10, next => 0x01 });
callback.onRegionComplete(r1, { sum => 7, next => 0x02 });
[r1 is split into r2 and r3]
```
How would we signal to the caller that the results for region r1 are
actually no longer relevant, and that they should now consider those results
replaced by r2 and r3? Again, I think keeping the interface as is and
continuing to throw an error in this edge case would be better, but open to
other thoughts
##########
hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncTableCoprocessorEndpoint.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.AsyncTable.CoprocessorCallback;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.ServiceCaller;
+import org.apache.hadoop.hbase.client.Table;
+import
org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.ColumnAggregationService;
+import
org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumRequest;
+import
org.apache.hadoop.hbase.coprocessor.protobuf.generated.ColumnAggregationProtos.SumResponse;
+import
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.AddrResponseProto;
+import
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
+import
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
+import
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
+import
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyResponseProto;
+import
org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.PauseRequestProto;
+import
org.apache.hadoop.hbase.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class, CoprocessorTests.class })
+public class TestAsyncTableCoprocessorEndpoint {
Review Comment:
We do have other async coprocessor endpoint tests, but the only one I could
find (`TestAsyncCoprocessorEndpoint`) is for the admin client
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]