szetszwo commented on code in PR #1264:
URL: https://github.com/apache/ratis/pull/1264#discussion_r2094649885


##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientRpc.class);
+
+  private Supplier<String> name;
+  private ClientId cId;

Review Comment:
   Let's rename it to `clientId`.  We sometimes use cid for callId.



##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientRpc.class);
+
+  private Supplier<String> name;

Review Comment:
   This field is not needed.  Just pass the name inside the method.



##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientRpc.class);
+
+  private Supplier<String> name;
+  private ClientId cId;
+  private final TimeDuration requestTimeoutDuration;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
   public NettyClientRpc(ClientId clientId, RaftProperties properties) {
     super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+    this.cId = clientId;
+    this.requestTimeoutDuration = 
RaftClientConfigKeys.Rpc.requestTimeout(properties);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest 
request) {
     final RaftPeerId serverId = request.getServerId();
+    long callId = request.getCallId();
     try {
+      name = JavaUtils.memoize(() -> cId + "->" + serverId);
       final NettyRpcProxy proxy = getProxies().getProxy(serverId);
       final RaftNettyServerRequestProto serverRequestProto = 
buildRequestProto(request);
-      return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+      final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
+
+      proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
         if (request instanceof GroupListRequest) {
           return 
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
         } else if (request instanceof GroupInfoRequest) {
           return 
ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
         } else {
           return 
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
         }
+      }).thenCompose(raftReply -> {
+        final NotLeaderException nle = raftReply.getNotLeaderException();
+        if (nle != null) {
+          return failedFuture(nle);
+        }
+        final LeaderNotReadyException lnre = 
raftReply.getLeaderNotReadyException();
+        if (lnre != null) {
+          return failedFuture(lnre);
+        }
+        return CompletableFuture.completedFuture(raftReply);
+      }).whenComplete((raftReply, ex) -> {
+        if (ex != null) {
+          replyFuture.completeExceptionally(ex);
+        } else {
+          replyFuture.complete(raftReply);
+        }

Review Comment:
   We should combine them:
   ```java
           if (e == null) {
             if (reply == null) {
               e = new NullPointerException("Both reply==null && e==null");
             }
             if (e == null) {
               e = reply.getNotLeaderException();
             }
             if (e == null) {
               e = reply.getLeaderNotReadyException();
             }
           }
           if (e != null) {
             replyFuture.completeExceptionally(e);
           } else {
             replyFuture.complete(reply);
           }
   ```



##########
ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java:
##########
@@ -417,13 +422,13 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws 
Exception {
     LOG.info("Running testAppendEntriesTimeout");
     final TimeDuration oldExpiryTime = 
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
     RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), 
TimeDuration.valueOf(20, TimeUnit.SECONDS));
-    waitForLeader(cluster);
+    RaftPeerId id = waitForLeader(cluster).getId();
     long time = System.currentTimeMillis();
     long waitTime = 5000;
     try (final RaftClient client = cluster.createClient()) {
       // block append requests
       cluster.getServerAliveStream()
-          .filter(impl -> !impl.getInfo().isLeader())
+          .filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() 
!= id)

Review Comment:
   We should use `equals(..)` instead of `!=`



##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientRpc.class);
+
+  private Supplier<String> name;
+  private ClientId cId;
+  private final TimeDuration requestTimeoutDuration;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
   public NettyClientRpc(ClientId clientId, RaftProperties properties) {
     super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+    this.cId = clientId;
+    this.requestTimeoutDuration = 
RaftClientConfigKeys.Rpc.requestTimeout(properties);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest 
request) {
     final RaftPeerId serverId = request.getServerId();
+    long callId = request.getCallId();
     try {
+      name = JavaUtils.memoize(() -> cId + "->" + serverId);
       final NettyRpcProxy proxy = getProxies().getProxy(serverId);
       final RaftNettyServerRequestProto serverRequestProto = 
buildRequestProto(request);
-      return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+      final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
+
+      proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
         if (request instanceof GroupListRequest) {
           return 
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
         } else if (request instanceof GroupInfoRequest) {
           return 
ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
         } else {
           return 
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
         }
+      }).thenCompose(raftReply -> {
+        final NotLeaderException nle = raftReply.getNotLeaderException();
+        if (nle != null) {
+          return failedFuture(nle);
+        }
+        final LeaderNotReadyException lnre = 
raftReply.getLeaderNotReadyException();
+        if (lnre != null) {
+          return failedFuture(lnre);
+        }
+        return CompletableFuture.completedFuture(raftReply);
+      }).whenComplete((raftReply, ex) -> {
+        if (ex != null) {
+          replyFuture.completeExceptionally(ex);
+        } else {
+          replyFuture.complete(raftReply);
+        }
       });
+
+      scheduler.onTimeout(requestTimeoutDuration,
+        () -> {
+          if (!replyFuture.isDone()) {
+            TimeoutIOException timeout = new TimeoutIOException(
+                getName()+ " request #" + callId + " timeout " +
+                requestTimeoutDuration.getDuration());
+            replyFuture.completeExceptionally(timeout);
+          }
+        }, LOG, () -> "Timeout check for client request #" + callId
+      );

Review Comment:
   Directly use `clientId + "->" + serverId` inside lambda.
   ```java
         scheduler.onTimeout(requestTimeout, () -> {
           if (!replyFuture.isDone()) {
             final String s = clientId + "->" + serverId + " request #" + 
callId + " timeout " + requestTimeout.getDuration();
             replyFuture.completeExceptionally(new TimeoutIOException(s));
           }
         }, LOG, () -> "Timeout check for client request #" + callId);
   ```



##########
ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java:
##########
@@ -433,7 +438,7 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws 
Exception {
       Assert.assertFalse(replyFuture.isDone());
       // unblock append request.
       cluster.getServerAliveStream()
-          .filter(impl -> !impl.getInfo().isLeader())
+          .filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() 
!= id)

Review Comment:
   We should use `equals(..)` instead of `!=`.



##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
 import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 
 public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(NettyClientRpc.class);
+
+  private Supplier<String> name;
+  private ClientId cId;
+  private final TimeDuration requestTimeoutDuration;
+  private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
   public NettyClientRpc(ClientId clientId, RaftProperties properties) {
     super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+    this.cId = clientId;
+    this.requestTimeoutDuration = 
RaftClientConfigKeys.Rpc.requestTimeout(properties);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest 
request) {
     final RaftPeerId serverId = request.getServerId();
+    long callId = request.getCallId();
     try {
+      name = JavaUtils.memoize(() -> cId + "->" + serverId);
       final NettyRpcProxy proxy = getProxies().getProxy(serverId);
       final RaftNettyServerRequestProto serverRequestProto = 
buildRequestProto(request);
-      return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+      final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
+
+      proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
         if (request instanceof GroupListRequest) {
           return 
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
         } else if (request instanceof GroupInfoRequest) {
           return 
ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
         } else {
           return 
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
         }
+      }).thenCompose(raftReply -> {
+        final NotLeaderException nle = raftReply.getNotLeaderException();
+        if (nle != null) {
+          return failedFuture(nle);
+        }
+        final LeaderNotReadyException lnre = 
raftReply.getLeaderNotReadyException();
+        if (lnre != null) {
+          return failedFuture(lnre);
+        }
+        return CompletableFuture.completedFuture(raftReply);
+      }).whenComplete((raftReply, ex) -> {
+        if (ex != null) {
+          replyFuture.completeExceptionally(ex);
+        } else {
+          replyFuture.complete(raftReply);
+        }
       });
+
+      scheduler.onTimeout(requestTimeoutDuration,
+        () -> {
+          if (!replyFuture.isDone()) {
+            TimeoutIOException timeout = new TimeoutIOException(
+                getName()+ " request #" + callId + " timeout " +
+                requestTimeoutDuration.getDuration());
+            replyFuture.completeExceptionally(timeout);
+          }
+        }, LOG, () -> "Timeout check for client request #" + callId
+      );
+
+      return replyFuture;
     } catch (Throwable e) {
       return JavaUtils.completeExceptionally(e);
     }
   }
 
+  private String getName() {
+    return name.get();
+  }
+
+  private <T> CompletableFuture<T> failedFuture(Throwable ex) {
+    CompletableFuture<T> future = new CompletableFuture<>();
+    future.completeExceptionally(ex);
+    return future;
+  }

Review Comment:
   This method is not needed.  Just `completeExceptionally` the `replyFuture`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to