This is an automated email from the ASF dual-hosted git repository.
williamsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7015ba2f2 RATIS-1916. OrderAsync does not call handReply. (#948)
7015ba2f2 is described below
commit 7015ba2f274394697dffec417b43374656077d88
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Oct 24 23:17:42 2023 -0700
RATIS-1916. OrderAsync does not call handReply. (#948)
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 80 ++++++----------------
.../org/apache/ratis/client/impl/OrderedAsync.java | 31 +++------
.../apache/ratis/client/impl/UnorderedAsync.java | 2 +-
.../java/org/apache/ratis/util/ProtoUtils.java | 3 +
4 files changed, 37 insertions(+), 79 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index b5b1d8abe..3c0f14fb7 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -40,12 +40,10 @@ import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.RouteProto;
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
-import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.proto.RaftProtos.SnapshotCreateRequestProto;
import org.apache.ratis.proto.RaftProtos.SnapshotManagementRequestProto;
import org.apache.ratis.proto.RaftProtos.StateMachineExceptionProto;
import org.apache.ratis.proto.RaftProtos.TransferLeadershipRequestProto;
-import org.apache.ratis.proto.RaftProtos.WriteRequestTypeProto;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.GroupInfoReply;
@@ -110,55 +108,30 @@ public interface ClientProtoUtils {
.setSuccess(success);
}
- static RaftRpcRequestProto.Builder
toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId)
{
- return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(),
- replyId.toByteString(), requestorId.getGroupId(), null, false, null,
null, 0);
- }
-
- @SuppressWarnings("parameternumber")
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long
callId, boolean toLeader,
- SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long
timeoutMs) {
- if (slidingWindowEntry == null) {
- slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
- }
-
- RaftRpcRequestProto.Builder b = RaftRpcRequestProto.newBuilder()
- .setRequestorId(requesterId)
- .setReplyId(replyId)
- .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
- .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
- .setToLeader(toLeader)
- .setSlidingWindowEntry(slidingWindowEntry)
- .setTimeoutMs(timeoutMs);
-
- if (routingTable != null) {
- b.setRoutingTable(routingTable.toProto());
- }
-
- return b;
+ ByteString requestorId, RaftPeerId replyId, RaftGroupId groupId) {
+ return RaftRpcRequestProto.newBuilder()
+ .setRequestorId(requestorId)
+ .setReplyId(replyId.toByteString())
+ .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId));
}
- @SuppressWarnings("parameternumber")
- static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long
callId, boolean toLeader,
- SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long
timeoutMs) {
- return toRaftRpcRequestProtoBuilder(
- requesterId.toByteString(), replyId.toByteString(), groupId, callId,
toLeader, slidingWindowEntry, routingTable,
- timeoutMs);
+ /** For server requests. */
+ static RaftRpcRequestProto.Builder
toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId)
{
+ return
toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(), replyId,
requestorId.getGroupId());
}
- static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- RaftClientRequest request) {
- return toRaftRpcRequestProtoBuilder(
- request.getClientId(),
- request.getServerId(),
- request.getRaftGroupId(),
- request.getCallId(),
- request.isToLeader(),
- request.getSlidingWindowEntry(),
- request.getRoutingTable(),
- request.getTimeoutMs());
+ /** For client requests. */
+ static RaftRpcRequestProto.Builder
toRaftRpcRequestProtoBuilder(RaftClientRequest request) {
+ final RaftRpcRequestProto.Builder b = toRaftRpcRequestProtoBuilder(
+ request.getClientId().toByteString(), request.getServerId(),
request.getRaftGroupId());
+
+
Optional.ofNullable(request.getSlidingWindowEntry()).ifPresent(b::setSlidingWindowEntry);
+
Optional.ofNullable(request.getRoutingTable()).map(RoutingTable::toProto).ifPresent(b::setRoutingTable);
+
+ return b.setCallId(request.getCallId())
+ .setToLeader(request.isToLeader())
+ .setTimeoutMs(request.getTimeoutMs());
}
static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto
p) {
@@ -211,12 +184,14 @@ public interface ClientProtoUtils {
} else {
b.setServerId(perrId);
}
+ if (request.hasSlidingWindowEntry()) {
+ b.setSlidingWindowEntry(request.getSlidingWindowEntry());
+ }
return b.setClientId(ClientId.valueOf(request.getRequestorId()))
.setGroupId(ProtoUtils.toRaftGroupId(request.getRaftGroupId()))
.setCallId(request.getCallId())
.setMessage(toMessage(p.getMessage()))
.setType(type)
- .setSlidingWindowEntry(request.getSlidingWindowEntry())
.setRoutingTable(getRoutingTable(request))
.setTimeoutMs(request.getTimeoutMs())
.build();
@@ -264,17 +239,6 @@ public interface ClientProtoUtils {
return b.build();
}
- static RaftClientRequestProto toRaftClientRequestProto(
- ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
- long seqNum, ByteString content) {
- return RaftClientRequestProto.newBuilder()
- .setRpcRequest(toRaftRpcRequestProtoBuilder(
- clientId, serverId, groupId, callId, false,
ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
- .setWrite(WriteRequestTypeProto.getDefaultInstance())
- .setMessage(toClientMessageEntryProtoBuilder(content))
- .build();
- }
-
static StateMachineExceptionProto.Builder
toStateMachineExceptionProtoBuilder(StateMachineException e) {
final Throwable t = e.getCause() != null? e.getCause(): e;
return StateMachineExceptionProto.newBuilder()
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 5405d4906..a1aa58681 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -23,6 +23,7 @@ import
org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -172,7 +173,11 @@ public final class OrderedAsync {
).thenApply(reply -> RaftClientImpl.handleRaftException(reply,
CompletionException::new)
).whenComplete((r, e) -> {
if (e != null) {
- LOG.error("Failed to send request, message=" + message, e);
+ if (e.getCause() instanceof AlreadyClosedException) {
+ LOG.error("Failed to send request, message=" + message + " due to "
+ e);
+ } else {
+ LOG.error("Failed to send request, message=" + message, e);
+ }
}
requestSemaphore.release();
});
@@ -195,17 +200,7 @@ public final class OrderedAsync {
}
final RetryPolicy retryPolicy = client.getRetryPolicy();
- sendRequest(pending).thenAccept(reply -> {
- if (f.isDone()) {
- return;
- }
- if (reply == null) {
- scheduleWithTimeout(pending, request, retryPolicy, null);
- } else {
- client.handleReply(request, reply);
- f.complete(reply);
- }
- }).exceptionally(e -> {
+ sendRequest(pending).exceptionally(e -> {
if (e instanceof CompletionException) {
e = JavaUtils.unwrapCompletionException(e);
scheduleWithTimeout(pending, request, retryPolicy, e);
@@ -235,25 +230,21 @@ public final class OrderedAsync {
private CompletableFuture<RaftClientReply> sendRequest(PendingOrderedRequest
pending) {
final RetryPolicy retryPolicy = client.getRetryPolicy();
- final CompletableFuture<RaftClientReply> f;
final RaftClientRequest request;
if (getSlidingWindow((RaftPeerId) null).isFirst(pending.getSeqNum())) {
pending.setFirstRequest();
}
request = pending.newRequest();
LOG.debug("{}: send* {}", client.getId(), request);
- f = client.getClientRpc().sendRequestAsync(request);
- return f.thenApply(reply -> {
+ return client.getClientRpc().sendRequestAsync(request).thenApply(reply -> {
LOG.debug("{}: receive* {}", client.getId(), reply);
+ Objects.requireNonNull(reply, "reply == null");
+ client.handleReply(request, reply);
getSlidingWindow(request).receiveReply(
request.getSlidingWindowEntry().getSeqNum(), reply,
this::sendRequestWithRetry);
return reply;
}).exceptionally(e -> {
- if (LOG.isTraceEnabled()) {
- LOG.trace(client.getId() + ": Failed* " + request, e);
- } else {
- LOG.debug("{}: Failed* {} with {}", client.getId(), request, e);
- }
+ LOG.error(client.getId() + ": Failed* " + request, e);
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
pending.incrementExceptionCount(e);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index b053df172..84b817b58 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -103,7 +103,7 @@ public interface UnorderedAsync {
if (LOG.isTraceEnabled()) {
LOG.trace(clientId + ": attempt #" + attemptCount + " failed~ " +
request, e);
} else {
- LOG.debug("{}: attempt #{} failed {} with {}", clientId,
attemptCount, request, e);
+ LOG.debug("{}: attempt #{} failed {} with {}", clientId,
attemptCount, request, e.toString());
}
e = JavaUtils.unwrapCompletionException(e);
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index e57ae552b..cd22ebe6b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -234,6 +234,9 @@ public interface ProtoUtils {
}
static String toString(SlidingWindowEntry proto) {
+ if (proto == null) {
+ return null;
+ }
return proto.getSeqNum() + (proto.getIsFirst()? "*": "");
}