smengcl commented on code in PR #6014:
URL: https://github.com/apache/ozone/pull/6014#discussion_r1630027761


##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java:
##########
@@ -218,29 +226,69 @@ public ConcurrentMap<UUID, Long> getCommitInfoMap() {
     return commitInfoMap;
   }
 
-  private CompletableFuture<RaftClientReply> sendRequestAsync(
-      ContainerCommandRequestProto request) {
-    return TracingUtil.executeInNewSpan(
-        "XceiverClientRatis." + request.getCmdType().name(),
-        () -> {
-          final ContainerCommandRequestMessage message
-              = ContainerCommandRequestMessage.toMessage(
-              request, TracingUtil.exportCurrentSpan());
-          if (HddsUtils.isReadOnly(request)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("sendCommandAsync ReadOnly {}", message);
-            }
-            return getClient().async().sendReadOnly(message);
+  private CompletableFuture<RaftClientReply> sendRequestAsyncInternal(
+      ContainerCommandRequestProto request, ReplicationLevel 
writeReplicationLevel) {
+    final ContainerCommandRequestMessage message =
+        ContainerCommandRequestMessage.toMessage(request, 
TracingUtil.exportCurrentSpan());
+    if (HddsUtils.isReadOnly(request)) {
+      LOG.debug("sendCommandAsync ReadOnly message {}", message);
+      return getClient().async().sendReadOnly(message);
+    } else {
+      LOG.debug("sendCommandAsync Write {} message {}", writeReplicationLevel, 
message);
+      return getClient().async().send(message, 
writeReplicationLevel).handle((reply, e) -> {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("sendCommandAsync Write {} message {}: reply = {}, 
exception =",
+              writeReplicationLevel, message, reply, e);
+        }
+        if (reply != null) {
+          // If Raft server gives a reply, just return it.
+          return reply;
+        }
+        // reply == null implies exception != null with the current Raft 
server implementation
+        LOG.debug("reply is null, crafting a new RaftClientReply");
+        final Collection<CommitInfoProto> commitInfos;
+        final long maxCommitIndex;
+        if (e instanceof CompletionException) {
+          final CompletionException ce = (CompletionException) e;
+          if (e.getCause() instanceof NotReplicatedException) {
+            // Unwrap exception to get commitInfos if NotReplicatedException 
is thrown
+            final NotReplicatedException nre = (NotReplicatedException) 
ce.getCause();
+            commitInfos = nre.getCommitInfos();
+            maxCommitIndex = commitInfos.stream()
+                .map(CommitInfoProto::getCommitIndex)
+                .max(Long::compareTo).orElse(-2L);
           } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("sendCommandAsync {}", message);
-            }
-            return getClient().async().send(message);
+            // Otherwise just throw the exception as is
+            throw ce;
           }
-
+        } else {
+          throw new CompletionException("Unexpected exception", e);
         }
+        RaftGroupMemberId raftId = 
RaftGroupMemberId.valueOf(RaftPeerId.valueOf("peer"), 
RaftGroupId.emptyGroupId());

Review Comment:
   This is actually just a dummy field that is unfortunately required by the 
proto. What we actually need is just the CommitInfos.



##########
hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java:
##########
@@ -218,29 +226,69 @@ public ConcurrentMap<UUID, Long> getCommitInfoMap() {
     return commitInfoMap;
   }
 
-  private CompletableFuture<RaftClientReply> sendRequestAsync(
-      ContainerCommandRequestProto request) {
-    return TracingUtil.executeInNewSpan(
-        "XceiverClientRatis." + request.getCmdType().name(),
-        () -> {
-          final ContainerCommandRequestMessage message
-              = ContainerCommandRequestMessage.toMessage(
-              request, TracingUtil.exportCurrentSpan());
-          if (HddsUtils.isReadOnly(request)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("sendCommandAsync ReadOnly {}", message);
-            }
-            return getClient().async().sendReadOnly(message);
+  private CompletableFuture<RaftClientReply> sendRequestAsyncInternal(
+      ContainerCommandRequestProto request, ReplicationLevel 
writeReplicationLevel) {
+    final ContainerCommandRequestMessage message =
+        ContainerCommandRequestMessage.toMessage(request, 
TracingUtil.exportCurrentSpan());
+    if (HddsUtils.isReadOnly(request)) {
+      LOG.debug("sendCommandAsync ReadOnly message {}", message);
+      return getClient().async().sendReadOnly(message);
+    } else {
+      LOG.debug("sendCommandAsync Write {} message {}", writeReplicationLevel, 
message);
+      return getClient().async().send(message, 
writeReplicationLevel).handle((reply, e) -> {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("sendCommandAsync Write {} message {}: reply = {}, 
exception =",
+              writeReplicationLevel, message, reply, e);
+        }
+        if (reply != null) {
+          // If Raft server gives a reply, just return it.
+          return reply;
+        }
+        // reply == null implies exception != null with the current Raft 
server implementation
+        LOG.debug("reply is null, crafting a new RaftClientReply");
+        final Collection<CommitInfoProto> commitInfos;
+        final long maxCommitIndex;
+        if (e instanceof CompletionException) {
+          final CompletionException ce = (CompletionException) e;
+          if (e.getCause() instanceof NotReplicatedException) {
+            // Unwrap exception to get commitInfos if NotReplicatedException 
is thrown
+            final NotReplicatedException nre = (NotReplicatedException) 
ce.getCause();
+            commitInfos = nre.getCommitInfos();
+            maxCommitIndex = commitInfos.stream()
+                .map(CommitInfoProto::getCommitIndex)
+                .max(Long::compareTo).orElse(-2L);
           } else {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("sendCommandAsync {}", message);
-            }
-            return getClient().async().send(message);
+            // Otherwise just throw the exception as is
+            throw ce;
           }
-
+        } else {
+          throw new CompletionException("Unexpected exception", e);
         }
+        RaftGroupMemberId raftId = 
RaftGroupMemberId.valueOf(RaftPeerId.valueOf("peer"), 
RaftGroupId.emptyGroupId());

Review Comment:
   This is actually just a dummy field that is unfortunately required by the 
proto. What we really need is just the CommitInfos.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to