This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 5561f21ef6 HDDS-11174. [hsync] Change
XceiverClientRatis.watchForCommit to async. (#6941)
5561f21ef6 is described below
commit 5561f21ef62b34679421a7267bae84707ffd3e1c
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Aug 7 15:24:10 2024 -0700
HDDS-11174. [hsync] Change XceiverClientRatis.watchForCommit to async.
(#6941)
Co-authored-by: Siyao Meng <[email protected]>
---
.../apache/hadoop/hdds/scm/XceiverClientGrpc.java | 9 ----
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 53 +++++++++-------------
.../hadoop/hdds/scm/client/HddsClientUtils.java | 3 +-
.../hdds/scm/storage/AbstractCommitWatcher.java | 17 ++++---
.../storage/TestBlockOutputStreamCorrectness.java | 4 +-
.../apache/hadoop/hdds/scm/XceiverClientSpi.java | 8 ++--
.../hadoop/ozone/client/MockXceiverClientSpi.java | 5 --
.../hadoop/ozone/client/rpc/TestCommitInRatis.java | 4 +-
.../ozone/client/rpc/TestWatchForCommit.java | 9 ++--
.../hadoop/ozone/freon/DatanodeChunkGenerator.java | 3 +-
10 files changed, 46 insertions(+), 69 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 5cd41edd38..c02306f8af 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
-import java.util.concurrent.TimeoutException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -649,14 +648,6 @@ public class XceiverClientGrpc extends XceiverClientSpi {
}
}
- @Override
- public XceiverClientReply watchForCommit(long index)
- throws InterruptedException, ExecutionException, TimeoutException,
- IOException {
- // there is no notion of watch for commit index in standalone pipeline
- return null;
- }
-
@Override
public long getReplicatedMinCommitIndex() {
return 0;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index b67f4a56ec..2794ca9c61 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -29,8 +29,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
@@ -66,6 +64,7 @@ import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -295,46 +294,39 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
}
@Override
- public XceiverClientReply watchForCommit(long index)
- throws InterruptedException, ExecutionException, TimeoutException,
- IOException {
+ public CompletableFuture<XceiverClientReply> watchForCommit(long index) {
final long replicatedMin = getReplicatedMinCommitIndex();
if (replicatedMin >= index) {
- return newWatchReply(index, "replicatedMin", replicatedMin);
+ return CompletableFuture.completedFuture(newWatchReply(index,
"replicatedMin", replicatedMin));
}
- try {
- CompletableFuture<RaftClientReply> replyFuture =
getClient().async().watch(index, watchType);
- final RaftClientReply reply = replyFuture.get();
+ final CompletableFuture<XceiverClientReply> replyFuture = new
CompletableFuture<>();
+ getClient().async().watch(index, watchType).thenAccept(reply -> {
final long updated = updateCommitInfosMap(reply, watchType);
- Preconditions.checkState(updated >= index, "Returned index " + updated +
" is smaller than expected " + index);
- return newWatchReply(index, watchType, updated);
- } catch (Exception e) {
+ Preconditions.checkState(updated >= index, "Returned index " + updated +
" < expected " + index);
+ replyFuture.complete(newWatchReply(index, watchType, updated));
+ }).exceptionally(e -> {
LOG.warn("{} way commit failed on pipeline {}", watchType, pipeline, e);
- Throwable t =
- HddsClientUtils.containsException(e, GroupMismatchException.class);
- if (t != null) {
- throw e;
- }
- if (watchType == ReplicationLevel.ALL_COMMITTED) {
- Throwable nre =
- HddsClientUtils.containsException(e, NotReplicatedException.class);
- Collection<CommitInfoProto> commitInfoProtoList;
+ final boolean isGroupMismatch = HddsClientUtils.containsException(e,
GroupMismatchException.class) != null;
+ if (!isGroupMismatch && watchType == ReplicationLevel.ALL_COMMITTED) {
+ final Throwable nre = HddsClientUtils.containsException(e,
NotReplicatedException.class);
if (nre instanceof NotReplicatedException) {
// If NotReplicatedException is thrown from the Datanode leader
// we can save one watch request round trip by using the
CommitInfoProto
// in the NotReplicatedException
- commitInfoProtoList = ((NotReplicatedException)
nre).getCommitInfos();
+ final Collection<CommitInfoProto> commitInfoProtoList =
((NotReplicatedException) nre).getCommitInfos();
+ replyFuture.complete(handleFailedAllCommit(index,
commitInfoProtoList));
} else {
- final RaftClientReply reply = getClient().async()
- .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
- .get();
- commitInfoProtoList = reply.getCommitInfos();
+ getClient().async().watch(index, ReplicationLevel.MAJORITY_COMMITTED)
+ .thenApply(reply -> handleFailedAllCommit(index,
reply.getCommitInfos()))
+ .whenComplete(JavaUtils.asBiConsumer(replyFuture));
}
- return handleFailedAllCommit(index, commitInfoProtoList);
+ } else {
+ replyFuture.completeExceptionally(e);
}
- throw e;
- }
+ return null;
+ });
+ return replyFuture;
}
private XceiverClientReply handleFailedAllCommit(long index,
Collection<CommitInfoProto> commitInfoProtoList) {
@@ -374,8 +366,7 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
raftClientReply.whenComplete((reply, e) -> {
if (LOG.isDebugEnabled()) {
- LOG.debug("received reply {} for request: cmdType={}
containerID={}"
- + " pipelineID={} traceID={} exception: {}", reply,
+ LOG.debug("received reply {} for request: cmdType={},
containerID={}, pipelineID={}, traceID={}", reply,
request.getCmdType(), request.getContainerID(),
request.getPipelineID(), request.getTraceID(), e);
}
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
index 6c5f9a0a98..f5a7c0ad55 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
@@ -254,9 +254,8 @@ public final class HddsClientUtils {
// This will return the underlying expected exception if it exists
// in an exception trace. Otherwise, returns null.
- public static Throwable containsException(Exception e,
+ public static Throwable containsException(Throwable t,
Class<? extends Exception> expectedExceptionClass) {
- Throwable t = e;
while (t != null) {
if (expectedExceptionClass.isInstance(t)) {
return t;
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index fb489d0d0c..61bc73420e 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -127,19 +126,17 @@ abstract class AbstractCommitWatcher<BUFFER> {
* @return minimum commit index replicated to all nodes
* @throws IOException IOException in case watch gets timed out
*/
- XceiverClientReply watchForCommit(long commitIndex)
- throws IOException {
+ CompletableFuture<XceiverClientReply> watchForCommitAsync(long commitIndex) {
final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
= JavaUtils.memoize(CompletableFuture::new);
final CompletableFuture<XceiverClientReply> f =
replies.compute(commitIndex,
(key, value) -> value != null ? value : supplier.get());
if (!supplier.isInitialized()) {
// future already exists
- return f.join();
+ return f;
}
- try {
- final XceiverClientReply reply = client.watchForCommit(commitIndex);
+ return client.watchForCommit(commitIndex).thenApply(reply -> {
f.complete(reply);
final CompletableFuture<XceiverClientReply> removed =
replies.remove(commitIndex);
Preconditions.checkState(removed == f);
@@ -147,11 +144,17 @@ abstract class AbstractCommitWatcher<BUFFER> {
final long index = reply != null ? reply.getLogIndex() : 0;
adjustBuffers(index);
return reply;
+ });
+ }
+
+ XceiverClientReply watchForCommit(long commitIndex) throws IOException {
+ try {
+ return watchForCommitAsync(commitIndex).get();
} catch (InterruptedException e) {
// Re-interrupt the thread while catching InterruptedException
Thread.currentThread().interrupt();
throw getIOExceptionForWatchForCommit(commitIndex, e);
- } catch (TimeoutException | ExecutionException e) {
+ } catch (ExecutionException e) {
throw getIOExceptionForWatchForCommit(commitIndex, e);
}
}
diff --git
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index df55b5bf57..df4d1cb3f8 100644
---
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -276,7 +276,7 @@ class TestBlockOutputStreamCorrectness {
}
@Override
- public XceiverClientReply watchForCommit(long index) {
+ public CompletableFuture<XceiverClientReply> watchForCommit(long index) {
final ContainerCommandResponseProto.Builder builder =
ContainerCommandResponseProto.newBuilder()
.setCmdType(Type.WriteChunk)
@@ -284,7 +284,7 @@ class TestBlockOutputStreamCorrectness {
final XceiverClientReply xceiverClientReply = new XceiverClientReply(
CompletableFuture.completedFuture(builder.build()));
xceiverClientReply.setLogIndex(index);
- return xceiverClientReply;
+ return CompletableFuture.completedFuture(xceiverClientReply);
}
@Override
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index f6529e84bd..9ac32c469c 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -22,8 +22,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.HddsUtils;
@@ -176,9 +176,9 @@ public abstract class XceiverClientSpi implements Closeable
{
* @return reply containing the min commit index replicated to all or
majority
* servers in case of a failure
*/
- public abstract XceiverClientReply watchForCommit(long index)
- throws InterruptedException, ExecutionException, TimeoutException,
- IOException;
+ public CompletableFuture<XceiverClientReply> watchForCommit(long index) {
+ return CompletableFuture.completedFuture(null);
+ }
/**
* returns the min commit index replicated to all servers.
diff --git
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 0d82f0f8bb..b14582c8ea 100644
---
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -187,11 +187,6 @@ public class MockXceiverClientSpi extends XceiverClientSpi
{
return pipeline.getType();
}
- @Override
- public XceiverClientReply watchForCommit(long index) {
- return null;
- }
-
@Override
public long getReplicatedMinCommitIndex() {
return 0;
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
index f7fbbf37c5..4ff671df61 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
@@ -163,7 +163,7 @@ public class TestCommitInRatis {
reply.getResponse().get();
assertEquals(3, ratisClient.getCommitInfoMap().size());
// wait for the container to be created on all the nodes
- xceiverClient.watchForCommit(reply.getLogIndex());
+ xceiverClient.watchForCommit(reply.getLogIndex()).get();
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
// shutdown the ratis follower
if (RatisTestHelper.isRatisFollower(dn, pipeline)) {
@@ -175,7 +175,7 @@ public class TestCommitInRatis {
.getCloseContainer(pipeline,
container1.getContainerInfo().getContainerID()));
reply.getResponse().get();
- xceiverClient.watchForCommit(reply.getLogIndex());
+ xceiverClient.watchForCommit(reply.getLogIndex()).get();
if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
// commitInfo Map will be reduced to 2 here
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index f42969e67f..bec14b23b0 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -291,7 +291,7 @@ public class TestWatchForCommit {
// as well as there is no logIndex generate in Ratis.
// The basic idea here is just to test if its throws an exception.
ExecutionException e = assertThrows(ExecutionException.class,
- () -> xceiverClient.watchForCommit(index + RandomUtils.nextInt(0,
100) + 10));
+ () -> xceiverClient.watchForCommit(index + RandomUtils.nextInt(0,
100) + 10).get());
// since the timeout value is quite long, the watch request will either
// fail with NotReplicated exceptio, RetryFailureException or
// RuntimeException
@@ -348,7 +348,7 @@ public class TestWatchForCommit {
.getCloseContainer(pipeline,
container1.getContainerInfo().getContainerID()));
reply.getResponse().get();
- xceiverClient.watchForCommit(reply.getLogIndex());
+ xceiverClient.watchForCommit(reply.getLogIndex()).get();
// commitInfo Map will be reduced to 2 here
if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
@@ -392,9 +392,8 @@ public class TestWatchForCommit {
// just watch for a log index which in not updated in the commitInfo
Map
// as well as there is no logIndex generate in Ratis.
// The basic idea here is just to test if its throws an exception.
- Exception e =
- assertThrows(Exception.class,
- () -> xceiverClient.watchForCommit(reply.getLogIndex() +
RandomUtils.nextInt(0, 100) + 10));
+ final Exception e = assertThrows(Exception.class,
+ () -> xceiverClient.watchForCommit(reply.getLogIndex() +
RandomUtils.nextInt(0, 100) + 10).get());
assertInstanceOf(GroupMismatchException.class,
HddsClientUtils.checkForException(e));
} finally {
clientManager.releaseClient(xceiverClient, false);
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
index 6362f32d04..23988106d4 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
@@ -246,8 +246,7 @@ public class DatanodeChunkGenerator extends
BaseFreonGenerator implements
if (async) {
XceiverClientReply xceiverClientReply =
xceiverClientSpi.sendCommandAsync(request);
- xceiverClientSpi
- .watchForCommit(xceiverClientReply.getLogIndex());
+
xceiverClientSpi.watchForCommit(xceiverClientReply.getLogIndex()).get();
} else {
xceiverClientSpi.sendCommand(request);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]