szetszwo commented on code in PR #1264:
URL: https://github.com/apache/ratis/pull/1264#discussion_r2094649885
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyClientRpc.class);
+
+ private Supplier<String> name;
+ private ClientId cId;
Review Comment:
Let's rename it to `clientId`. We sometimes use cid for callId.
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyClientRpc.class);
+
+ private Supplier<String> name;
Review Comment:
This field is not needed. Just pass the name inside the method.
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyClientRpc.class);
+
+ private Supplier<String> name;
+ private ClientId cId;
+ private final TimeDuration requestTimeoutDuration;
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+ this.cId = clientId;
+ this.requestTimeoutDuration =
RaftClientConfigKeys.Rpc.requestTimeout(properties);
}
@Override
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest
request) {
final RaftPeerId serverId = request.getServerId();
+ long callId = request.getCallId();
try {
+ name = JavaUtils.memoize(() -> cId + "->" + serverId);
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
final RaftNettyServerRequestProto serverRequestProto =
buildRequestProto(request);
- return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+ final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
+
+ proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
if (request instanceof GroupListRequest) {
return
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
} else if (request instanceof GroupInfoRequest) {
return
ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
} else {
return
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
}
+ }).thenCompose(raftReply -> {
+ final NotLeaderException nle = raftReply.getNotLeaderException();
+ if (nle != null) {
+ return failedFuture(nle);
+ }
+ final LeaderNotReadyException lnre =
raftReply.getLeaderNotReadyException();
+ if (lnre != null) {
+ return failedFuture(lnre);
+ }
+ return CompletableFuture.completedFuture(raftReply);
+ }).whenComplete((raftReply, ex) -> {
+ if (ex != null) {
+ replyFuture.completeExceptionally(ex);
+ } else {
+ replyFuture.complete(raftReply);
+ }
Review Comment:
We should combine them:
```java
if (e == null) {
if (reply == null) {
e = new NullPointerException("Both reply==null && e==null");
}
if (e == null) {
e = reply.getNotLeaderException();
}
if (e == null) {
e = reply.getLeaderNotReadyException();
}
}
if (e != null) {
replyFuture.completeExceptionally(e);
} else {
replyFuture.complete(reply);
}
```
##########
ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java:
##########
@@ -417,13 +422,13 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws
Exception {
LOG.info("Running testAppendEntriesTimeout");
final TimeDuration oldExpiryTime =
RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(),
TimeDuration.valueOf(20, TimeUnit.SECONDS));
- waitForLeader(cluster);
+ RaftPeerId id = waitForLeader(cluster).getId();
long time = System.currentTimeMillis();
long waitTime = 5000;
try (final RaftClient client = cluster.createClient()) {
// block append requests
cluster.getServerAliveStream()
- .filter(impl -> !impl.getInfo().isLeader())
+ .filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId()
!= id)
Review Comment:
We should use `equals(..)` instead of `!=`
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyClientRpc.class);
+
+ private Supplier<String> name;
+ private ClientId cId;
+ private final TimeDuration requestTimeoutDuration;
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+ this.cId = clientId;
+ this.requestTimeoutDuration =
RaftClientConfigKeys.Rpc.requestTimeout(properties);
}
@Override
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest
request) {
final RaftPeerId serverId = request.getServerId();
+ long callId = request.getCallId();
try {
+ name = JavaUtils.memoize(() -> cId + "->" + serverId);
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
final RaftNettyServerRequestProto serverRequestProto =
buildRequestProto(request);
- return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+ final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
+
+ proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
if (request instanceof GroupListRequest) {
return
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
} else if (request instanceof GroupInfoRequest) {
return
ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
} else {
return
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
}
+ }).thenCompose(raftReply -> {
+ final NotLeaderException nle = raftReply.getNotLeaderException();
+ if (nle != null) {
+ return failedFuture(nle);
+ }
+ final LeaderNotReadyException lnre =
raftReply.getLeaderNotReadyException();
+ if (lnre != null) {
+ return failedFuture(lnre);
+ }
+ return CompletableFuture.completedFuture(raftReply);
+ }).whenComplete((raftReply, ex) -> {
+ if (ex != null) {
+ replyFuture.completeExceptionally(ex);
+ } else {
+ replyFuture.complete(raftReply);
+ }
});
+
+ scheduler.onTimeout(requestTimeoutDuration,
+ () -> {
+ if (!replyFuture.isDone()) {
+ TimeoutIOException timeout = new TimeoutIOException(
+ getName()+ " request #" + callId + " timeout " +
+ requestTimeoutDuration.getDuration());
+ replyFuture.completeExceptionally(timeout);
+ }
+ }, LOG, () -> "Timeout check for client request #" + callId
+ );
Review Comment:
Directly use `clientId + "->" + serverId` inside lambda.
```java
scheduler.onTimeout(requestTimeout, () -> {
if (!replyFuture.isDone()) {
final String s = clientId + "->" + serverId + " request #" +
callId + " timeout " + requestTimeout.getDuration();
replyFuture.completeExceptionally(new TimeoutIOException(s));
}
}, LOG, () -> "Timeout check for client request #" + callId);
```
##########
ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java:
##########
@@ -433,7 +438,7 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws
Exception {
Assert.assertFalse(replyFuture.isDone());
// unblock append request.
cluster.getServerAliveStream()
- .filter(impl -> !impl.getInfo().isLeader())
+ .filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId()
!= id)
Review Comment:
We should use `equals(..)` instead of `!=`.
##########
ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java:
##########
@@ -28,36 +29,97 @@
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
+
+ public static final Logger LOG =
LoggerFactory.getLogger(NettyClientRpc.class);
+
+ private Supplier<String> name;
+ private ClientId cId;
+ private final TimeDuration requestTimeoutDuration;
+ private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
+ this.cId = clientId;
+ this.requestTimeoutDuration =
RaftClientConfigKeys.Rpc.requestTimeout(properties);
}
@Override
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest
request) {
final RaftPeerId serverId = request.getServerId();
+ long callId = request.getCallId();
try {
+ name = JavaUtils.memoize(() -> cId + "->" + serverId);
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
final RaftNettyServerRequestProto serverRequestProto =
buildRequestProto(request);
- return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
+ final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
+
+ proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
if (request instanceof GroupListRequest) {
return
ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
} else if (request instanceof GroupInfoRequest) {
return
ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
} else {
return
ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
}
+ }).thenCompose(raftReply -> {
+ final NotLeaderException nle = raftReply.getNotLeaderException();
+ if (nle != null) {
+ return failedFuture(nle);
+ }
+ final LeaderNotReadyException lnre =
raftReply.getLeaderNotReadyException();
+ if (lnre != null) {
+ return failedFuture(lnre);
+ }
+ return CompletableFuture.completedFuture(raftReply);
+ }).whenComplete((raftReply, ex) -> {
+ if (ex != null) {
+ replyFuture.completeExceptionally(ex);
+ } else {
+ replyFuture.complete(raftReply);
+ }
});
+
+ scheduler.onTimeout(requestTimeoutDuration,
+ () -> {
+ if (!replyFuture.isDone()) {
+ TimeoutIOException timeout = new TimeoutIOException(
+ getName()+ " request #" + callId + " timeout " +
+ requestTimeoutDuration.getDuration());
+ replyFuture.completeExceptionally(timeout);
+ }
+ }, LOG, () -> "Timeout check for client request #" + callId
+ );
+
+ return replyFuture;
} catch (Throwable e) {
return JavaUtils.completeExceptionally(e);
}
}
+ private String getName() {
+ return name.get();
+ }
+
+ private <T> CompletableFuture<T> failedFuture(Throwable ex) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(ex);
+ return future;
+ }
Review Comment:
This method is not needed. Just `completeExceptionally` the `replyFuture`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]