This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 5561f21ef6 HDDS-11174. [hsync] Change 
XceiverClientRatis.watchForCommit to async. (#6941)
5561f21ef6 is described below

commit 5561f21ef62b34679421a7267bae84707ffd3e1c
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Aug 7 15:24:10 2024 -0700

    HDDS-11174. [hsync] Change XceiverClientRatis.watchForCommit to async. 
(#6941)
    
    Co-authored-by: Siyao Meng <[email protected]>
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  9 ----
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java | 53 +++++++++-------------
 .../hadoop/hdds/scm/client/HddsClientUtils.java    |  3 +-
 .../hdds/scm/storage/AbstractCommitWatcher.java    | 17 ++++---
 .../storage/TestBlockOutputStreamCorrectness.java  |  4 +-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |  8 ++--
 .../hadoop/ozone/client/MockXceiverClientSpi.java  |  5 --
 .../hadoop/ozone/client/rpc/TestCommitInRatis.java |  4 +-
 .../ozone/client/rpc/TestWatchForCommit.java       |  9 ++--
 .../hadoop/ozone/freon/DatanodeChunkGenerator.java |  3 +-
 10 files changed, 46 insertions(+), 69 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 5cd41edd38..c02306f8af 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
-import java.util.concurrent.TimeoutException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -649,14 +648,6 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     }
   }
 
-  @Override
-  public XceiverClientReply watchForCommit(long index)
-      throws InterruptedException, ExecutionException, TimeoutException,
-      IOException {
-    // there is no notion of watch for commit index in standalone pipeline
-    return null;
-  }
-
   @Override
   public long getReplicatedMinCommitIndex() {
     return 0;
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index b67f4a56ec..2794ca9c61 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -29,8 +29,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Stream;
 
@@ -66,6 +64,7 @@ import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -295,46 +294,39 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
   }
 
   @Override
-  public XceiverClientReply watchForCommit(long index)
-      throws InterruptedException, ExecutionException, TimeoutException,
-      IOException {
+  public CompletableFuture<XceiverClientReply> watchForCommit(long index) {
     final long replicatedMin = getReplicatedMinCommitIndex();
     if (replicatedMin >= index) {
-      return newWatchReply(index, "replicatedMin", replicatedMin);
+      return CompletableFuture.completedFuture(newWatchReply(index, 
"replicatedMin", replicatedMin));
     }
 
-    try {
-      CompletableFuture<RaftClientReply> replyFuture = 
getClient().async().watch(index, watchType);
-      final RaftClientReply reply = replyFuture.get();
+    final CompletableFuture<XceiverClientReply> replyFuture = new 
CompletableFuture<>();
+    getClient().async().watch(index, watchType).thenAccept(reply -> {
       final long updated = updateCommitInfosMap(reply, watchType);
-      Preconditions.checkState(updated >= index, "Returned index " + updated + 
" is smaller than expected " + index);
-      return newWatchReply(index, watchType, updated);
-    } catch (Exception e) {
+      Preconditions.checkState(updated >= index, "Returned index " + updated + 
" < expected " + index);
+      replyFuture.complete(newWatchReply(index, watchType, updated));
+    }).exceptionally(e -> {
       LOG.warn("{} way commit failed on pipeline {}", watchType, pipeline, e);
-      Throwable t =
-          HddsClientUtils.containsException(e, GroupMismatchException.class);
-      if (t != null) {
-        throw e;
-      }
-      if (watchType == ReplicationLevel.ALL_COMMITTED) {
-        Throwable nre =
-            HddsClientUtils.containsException(e, NotReplicatedException.class);
-        Collection<CommitInfoProto> commitInfoProtoList;
+      final boolean isGroupMismatch = HddsClientUtils.containsException(e, 
GroupMismatchException.class) != null;
+      if (!isGroupMismatch && watchType == ReplicationLevel.ALL_COMMITTED) {
+        final Throwable nre = HddsClientUtils.containsException(e, 
NotReplicatedException.class);
         if (nre instanceof NotReplicatedException) {
           // If NotReplicatedException is thrown from the Datanode leader
           // we can save one watch request round trip by using the 
CommitInfoProto
           // in the NotReplicatedException
-          commitInfoProtoList = ((NotReplicatedException) 
nre).getCommitInfos();
+          final Collection<CommitInfoProto> commitInfoProtoList = 
((NotReplicatedException) nre).getCommitInfos();
+          replyFuture.complete(handleFailedAllCommit(index, 
commitInfoProtoList));
         } else {
-          final RaftClientReply reply = getClient().async()
-              .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
-              .get();
-          commitInfoProtoList = reply.getCommitInfos();
+          getClient().async().watch(index, ReplicationLevel.MAJORITY_COMMITTED)
+              .thenApply(reply -> handleFailedAllCommit(index, 
reply.getCommitInfos()))
+              .whenComplete(JavaUtils.asBiConsumer(replyFuture));
         }
-        return handleFailedAllCommit(index, commitInfoProtoList);
+      } else {
+        replyFuture.completeExceptionally(e);
       }
-      throw e;
-    }
+      return null;
+    });
+    return replyFuture;
   }
 
   private XceiverClientReply handleFailedAllCommit(long index, 
Collection<CommitInfoProto> commitInfoProtoList) {
@@ -374,8 +366,7 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
     CompletableFuture<ContainerCommandResponseProto> containerCommandResponse =
         raftClientReply.whenComplete((reply, e) -> {
           if (LOG.isDebugEnabled()) {
-            LOG.debug("received reply {} for request: cmdType={} 
containerID={}"
-                    + " pipelineID={} traceID={} exception: {}", reply,
+            LOG.debug("received reply {} for request: cmdType={}, 
containerID={}, pipelineID={}, traceID={}", reply,
                 request.getCmdType(), request.getContainerID(),
                 request.getPipelineID(), request.getTraceID(), e);
           }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
index 6c5f9a0a98..f5a7c0ad55 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/HddsClientUtils.java
@@ -254,9 +254,8 @@ public final class HddsClientUtils {
 
   // This will return the underlying expected exception if it exists
   // in an exception trace. Otherwise, returns null.
-  public static Throwable containsException(Exception e,
+  public static Throwable containsException(Throwable t,
             Class<? extends Exception> expectedExceptionClass) {
-    Throwable t = e;
     while (t != null) {
       if (expectedExceptionClass.isInstance(t)) {
         return t;
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
index fb489d0d0c..61bc73420e 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/AbstractCommitWatcher.java
@@ -37,7 +37,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -127,19 +126,17 @@ abstract class AbstractCommitWatcher<BUFFER> {
    * @return minimum commit index replicated to all nodes
    * @throws IOException IOException in case watch gets timed out
    */
-  XceiverClientReply watchForCommit(long commitIndex)
-      throws IOException {
+  CompletableFuture<XceiverClientReply> watchForCommitAsync(long commitIndex) {
     final MemoizedSupplier<CompletableFuture<XceiverClientReply>> supplier
         = JavaUtils.memoize(CompletableFuture::new);
     final CompletableFuture<XceiverClientReply> f = 
replies.compute(commitIndex,
         (key, value) -> value != null ? value : supplier.get());
     if (!supplier.isInitialized()) {
       // future already exists
-      return f.join();
+      return f;
     }
 
-    try {
-      final XceiverClientReply reply = client.watchForCommit(commitIndex);
+    return client.watchForCommit(commitIndex).thenApply(reply -> {
       f.complete(reply);
       final CompletableFuture<XceiverClientReply> removed = 
replies.remove(commitIndex);
       Preconditions.checkState(removed == f);
@@ -147,11 +144,17 @@ abstract class AbstractCommitWatcher<BUFFER> {
       final long index = reply != null ? reply.getLogIndex() : 0;
       adjustBuffers(index);
       return reply;
+    });
+  }
+
+  XceiverClientReply watchForCommit(long commitIndex) throws IOException {
+    try {
+      return watchForCommitAsync(commitIndex).get();
     } catch (InterruptedException e) {
       // Re-interrupt the thread while catching InterruptedException
       Thread.currentThread().interrupt();
       throw getIOExceptionForWatchForCommit(commitIndex, e);
-    } catch (TimeoutException | ExecutionException e) {
+    } catch (ExecutionException e) {
       throw getIOExceptionForWatchForCommit(commitIndex, e);
     }
   }
diff --git 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index df55b5bf57..df4d1cb3f8 100644
--- 
a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ 
b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -276,7 +276,7 @@ class TestBlockOutputStreamCorrectness {
     }
 
     @Override
-    public XceiverClientReply watchForCommit(long index) {
+    public CompletableFuture<XceiverClientReply> watchForCommit(long index) {
       final ContainerCommandResponseProto.Builder builder =
           ContainerCommandResponseProto.newBuilder()
               .setCmdType(Type.WriteChunk)
@@ -284,7 +284,7 @@ class TestBlockOutputStreamCorrectness {
       final XceiverClientReply xceiverClientReply = new XceiverClientReply(
           CompletableFuture.completedFuture(builder.build()));
       xceiverClientReply.setLogIndex(index);
-      return xceiverClientReply;
+      return CompletableFuture.completedFuture(xceiverClientReply);
     }
 
     @Override
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index f6529e84bd..9ac32c469c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -22,8 +22,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdds.HddsUtils;
@@ -176,9 +176,9 @@ public abstract class XceiverClientSpi implements Closeable 
{
    * @return reply containing the min commit index replicated to all or 
majority
    *         servers in case of a failure
    */
-  public abstract XceiverClientReply watchForCommit(long index)
-      throws InterruptedException, ExecutionException, TimeoutException,
-      IOException;
+  public CompletableFuture<XceiverClientReply> watchForCommit(long index) {
+    return CompletableFuture.completedFuture(null);
+  }
 
   /**
    * returns the min commit index replicated to all servers.
diff --git 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
index 0d82f0f8bb..b14582c8ea 100644
--- 
a/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
+++ 
b/hadoop-ozone/client/src/test/java/org/apache/hadoop/ozone/client/MockXceiverClientSpi.java
@@ -187,11 +187,6 @@ public class MockXceiverClientSpi extends XceiverClientSpi 
{
     return pipeline.getType();
   }
 
-  @Override
-  public XceiverClientReply watchForCommit(long index) {
-    return null;
-  }
-
   @Override
   public long getReplicatedMinCommitIndex() {
     return 0;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
index f7fbbf37c5..4ff671df61 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
@@ -163,7 +163,7 @@ public class TestCommitInRatis {
     reply.getResponse().get();
     assertEquals(3, ratisClient.getCommitInfoMap().size());
     // wait for the container to be created on all the nodes
-    xceiverClient.watchForCommit(reply.getLogIndex());
+    xceiverClient.watchForCommit(reply.getLogIndex()).get();
     for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
       // shutdown the ratis follower
       if (RatisTestHelper.isRatisFollower(dn, pipeline)) {
@@ -175,7 +175,7 @@ public class TestCommitInRatis {
         .getCloseContainer(pipeline,
             container1.getContainerInfo().getContainerID()));
     reply.getResponse().get();
-    xceiverClient.watchForCommit(reply.getLogIndex());
+    xceiverClient.watchForCommit(reply.getLogIndex()).get();
 
     if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
       // commitInfo Map will be reduced to 2 here
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index f42969e67f..bec14b23b0 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -291,7 +291,7 @@ public class TestWatchForCommit {
         // as well as there is no logIndex generate in Ratis.
         // The basic idea here is just to test if its throws an exception.
         ExecutionException e = assertThrows(ExecutionException.class,
-            () -> xceiverClient.watchForCommit(index + RandomUtils.nextInt(0, 
100) + 10));
+            () -> xceiverClient.watchForCommit(index + RandomUtils.nextInt(0, 
100) + 10).get());
         // since the timeout value is quite long, the watch request will either
         // fail with NotReplicated exceptio, RetryFailureException or
         // RuntimeException
@@ -348,7 +348,7 @@ public class TestWatchForCommit {
             .getCloseContainer(pipeline,
                 container1.getContainerInfo().getContainerID()));
         reply.getResponse().get();
-        xceiverClient.watchForCommit(reply.getLogIndex());
+        xceiverClient.watchForCommit(reply.getLogIndex()).get();
 
         // commitInfo Map will be reduced to 2 here
         if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
@@ -392,9 +392,8 @@ public class TestWatchForCommit {
         // just watch for a log index which in not updated in the commitInfo 
Map
         // as well as there is no logIndex generate in Ratis.
         // The basic idea here is just to test if its throws an exception.
-        Exception e =
-            assertThrows(Exception.class,
-                () -> xceiverClient.watchForCommit(reply.getLogIndex() + 
RandomUtils.nextInt(0, 100) + 10));
+        final Exception e = assertThrows(Exception.class,
+            () -> xceiverClient.watchForCommit(reply.getLogIndex() + 
RandomUtils.nextInt(0, 100) + 10).get());
         assertInstanceOf(GroupMismatchException.class, 
HddsClientUtils.checkForException(e));
       } finally {
         clientManager.releaseClient(xceiverClient, false);
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
index 6362f32d04..23988106d4 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
@@ -246,8 +246,7 @@ public class DatanodeChunkGenerator extends 
BaseFreonGenerator implements
       if (async) {
         XceiverClientReply xceiverClientReply =
             xceiverClientSpi.sendCommandAsync(request);
-        xceiverClientSpi
-            .watchForCommit(xceiverClientReply.getLogIndex());
+        
xceiverClientSpi.watchForCommit(xceiverClientReply.getLogIndex()).get();
 
       } else {
         xceiverClientSpi.sendCommand(request);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to