This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6f49943 RATIS-610. Add a builder for RaftClientReply. (#283)
6f49943 is described below
commit 6f49943a5f8b169f090fb5f6c6f3b8cbeae344a3
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Nov 18 11:46:04 2020 +0800
RATIS-610. Add a builder for RaftClientReply. (#283)
* RATIS-610. Add a builder for RaftClientReply.
* Fix testAsyncConfiguration
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 52 +++++----
.../org/apache/ratis/protocol/GroupInfoReply.java | 24 ++---
.../org/apache/ratis/protocol/GroupListReply.java | 20 ++--
.../apache/ratis/protocol/RaftClientMessage.java | 7 +-
.../org/apache/ratis/protocol/RaftClientReply.java | 118 +++++++++++++++------
.../java/org/apache/ratis/util/Preconditions.java | 6 ++
.../grpc/client/GrpcClientProtocolService.java | 7 +-
.../ratis/netty/server/DataStreamManagement.java | 15 ++-
.../org/apache/ratis/server/impl/LeaderState.java | 16 +--
.../apache/ratis/server/impl/PendingRequest.java | 9 +-
.../apache/ratis/server/impl/PendingRequests.java | 6 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 67 ++++++++----
.../apache/ratis/server/impl/RaftServerProxy.java | 9 +-
.../test/java/org/apache/ratis/RaftAsyncTests.java | 1 +
.../ratis/datastream/DataStreamBaseTest.java | 7 +-
.../ratis/datastream/TestDataStreamNetty.java | 7 +-
16 files changed, 244 insertions(+), 127 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 94c3d13..4498cff 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -284,35 +284,41 @@ public interface ClientProtoUtils {
} else {
e = null;
}
- ClientId clientId = ClientId.valueOf(rp.getRequestorId());
- return new RaftClientReply(clientId, serverMemberId, rp.getCallId(),
rp.getSuccess(),
- toMessage(replyProto.getMessage()), e,
- replyProto.getLogIndex(), replyProto.getCommitInfosList());
+
+ return RaftClientReply.newBuilder()
+ .setClientId(ClientId.valueOf(rp.getRequestorId()))
+ .setServerId(serverMemberId)
+ .setCallId(rp.getCallId())
+ .setSuccess(rp.getSuccess())
+ .setMessage(toMessage(replyProto.getMessage()))
+ .setException(e)
+ .setLogIndex(replyProto.getLogIndex())
+ .setCommitInfos(replyProto.getCommitInfosList())
+ .build();
}
- static GroupListReply toGroupListReply(
- GroupListReplyProto replyProto) {
- final RaftRpcReplyProto rp = replyProto.getRpcReply();
- ClientId clientId = ClientId.valueOf(rp.getRequestorId());
- final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
- final List<RaftGroupId> groupInfos = replyProto.getGroupIdList().stream()
+ static GroupListReply toGroupListReply(GroupListReplyProto replyProto) {
+ final RaftRpcReplyProto rpc = replyProto.getRpcReply();
+ final List<RaftGroupId> groupIds = replyProto.getGroupIdList().stream()
.map(ProtoUtils::toRaftGroupId)
.collect(Collectors.toList());
- return new GroupListReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
- groupId, rp.getCallId(), rp.getSuccess(), groupInfos);
+ return new GroupListReply(ClientId.valueOf(rpc.getRequestorId()),
+ RaftPeerId.valueOf(rpc.getReplyId()),
+ ProtoUtils.toRaftGroupId(rpc.getRaftGroupId()),
+ rpc.getCallId(),
+ groupIds);
}
- static GroupInfoReply toGroupInfoReply(
- GroupInfoReplyProto replyProto) {
- final RaftRpcReplyProto rp = replyProto.getRpcReply();
- ClientId clientId = ClientId.valueOf(rp.getRequestorId());
- final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
- final RaftGroup raftGroup = ProtoUtils.toRaftGroup(replyProto.getGroup());
- RoleInfoProto role = replyProto.getRole();
- boolean isRaftStorageHealthy = replyProto.getIsRaftStorageHealthy();
- return new GroupInfoReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
- groupId, rp.getCallId(), rp.getSuccess(), role, isRaftStorageHealthy,
- replyProto.getCommitInfosList(), raftGroup);
+ static GroupInfoReply toGroupInfoReply(GroupInfoReplyProto replyProto) {
+ final RaftRpcReplyProto rpc = replyProto.getRpcReply();
+ return new GroupInfoReply(ClientId.valueOf(rpc.getRequestorId()),
+ RaftPeerId.valueOf(rpc.getReplyId()),
+ ProtoUtils.toRaftGroupId(rpc.getRaftGroupId()),
+ rpc.getCallId(),
+ replyProto.getCommitInfosList(),
+ ProtoUtils.toRaftGroup(replyProto.getGroup()),
+ replyProto.getRole(),
+ replyProto.getIsRaftStorageHealthy());
}
static Message toMessage(final ClientMessageEntryProto p) {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
index 5cdb9fe..946bf23 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -31,24 +31,20 @@ public class GroupInfoReply extends RaftClientReply {
private final RoleInfoProto roleInfoProto;
private final boolean isRaftStorageHealthy;
- public GroupInfoReply(
- RaftClientRequest request, RoleInfoProto roleInfoProto,
- boolean isRaftStorageHealthy, Collection<CommitInfoProto>
commitInfos, RaftGroup group) {
- super(request, commitInfos);
- this.roleInfoProto = roleInfoProto;
- this.isRaftStorageHealthy = isRaftStorageHealthy;
- this.group = group;
+ public GroupInfoReply(RaftClientRequest request, Collection<CommitInfoProto>
commitInfos,
+ RaftGroup group, RoleInfoProto roleInfoProto, boolean
isRaftStorageHealthy) {
+ this(request.getClientId(), request.getServerId(),
request.getRaftGroupId(), request.getCallId(), commitInfos,
+ group, roleInfoProto, isRaftStorageHealthy);
}
@SuppressWarnings("parameternumber")
- public GroupInfoReply(
- ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
- long callId, boolean success, RoleInfoProto roleInfoProto,
- boolean isRaftStorageHealthy, Collection<CommitInfoProto>
commitInfos, RaftGroup group) {
- super(clientId, serverId, groupId, callId, success, null, null, 0L,
commitInfos);
+ public GroupInfoReply(ClientId clientId, RaftPeerId serverId, RaftGroupId
groupId, long callId,
+ Collection<CommitInfoProto> commitInfos,
+ RaftGroup group, RoleInfoProto roleInfoProto, boolean
isRaftStorageHealthy) {
+ super(clientId, serverId, groupId, callId, true, null, null, 0L,
commitInfos);
+ this.group = group;
this.roleInfoProto = roleInfoProto;
this.isRaftStorageHealthy = isRaftStorageHealthy;
- this.group = group;
}
public RaftGroup getGroup() {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java
index 5a9b000..bdc13f2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,26 +17,24 @@
*/
package org.apache.ratis.protocol;
+import java.util.Collections;
import java.util.List;
/**
- * The response of server information request. Sent from server to client.
+ * The response of group list request. Sent from server to client.
*/
public class GroupListReply extends RaftClientReply {
private final List<RaftGroupId> groupIds;
- public GroupListReply(
- RaftClientRequest request, List<RaftGroupId> groupIds) {
- super(request, null);
- this.groupIds = groupIds;
+ public GroupListReply(RaftClientRequest request, List<RaftGroupId>
groupIds) {
+ this(request.getClientId(), request.getServerId(),
request.getRaftGroupId(), request.getCallId(), groupIds);
}
- public GroupListReply(
- ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
- long callId, boolean success, List<RaftGroupId> groupIds) {
- super(clientId, serverId, groupId, callId, success, null, null, 0L, null);
- this.groupIds = groupIds;
+ public GroupListReply(ClientId clientId, RaftPeerId serverId, RaftGroupId
groupId, long callId,
+ List<RaftGroupId> groupIds) {
+ super(clientId, serverId, groupId, callId, true, null, null, 0L, null);
+ this.groupIds = Collections.unmodifiableList(groupIds);
}
public List<RaftGroupId> getGroupIds() {
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
index 3dfe6d3..8d3104a 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
@@ -18,6 +18,7 @@
package org.apache.ratis.protocol;
import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
public abstract class RaftClientMessage implements RaftRpcMessage {
private final ClientId clientId;
@@ -26,9 +27,9 @@ public abstract class RaftClientMessage implements
RaftRpcMessage {
private final long callId;
RaftClientMessage(ClientId clientId, RaftPeerId serverId, RaftGroupId
groupId, long callId) {
- this.clientId = clientId;
- this.serverId = serverId;
- this.groupId = groupId;
+ this.clientId = Preconditions.assertNotNull(clientId, "clientId");
+ this.serverId = Preconditions.assertNotNull(serverId, "serverId");
+ this.groupId = Preconditions.assertNotNull(groupId, "groupId");
this.callId = callId;
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 7973e0c..a45bd45 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -37,6 +37,93 @@ import java.util.Collections;
* Reply from server to client
*/
public class RaftClientReply extends RaftClientMessage {
+ /**
+ * To build {@link RaftClientReply}
+ */
+ public static class Builder {
+ private ClientId clientId;
+ private RaftPeerId serverId;
+ private RaftGroupId groupId;
+ private long callId;
+
+ private boolean success;
+ private Message message;
+ private RaftException exception;
+
+ private long logIndex;
+ private Collection<CommitInfoProto> commitInfos;
+
+ public RaftClientReply build() {
+ return new RaftClientReply(clientId, serverId, groupId, callId,
+ success, message, exception, logIndex, commitInfos);
+ }
+
+ public Builder setClientId(ClientId clientId) {
+ this.clientId = clientId;
+ return this;
+ }
+
+ public Builder setServerId(RaftPeerId serverId) {
+ this.serverId = serverId;
+ return this;
+ }
+
+ public Builder setGroupId(RaftGroupId groupId) {
+ this.groupId = groupId;
+ return this;
+ }
+
+ public Builder setCallId(long callId) {
+ this.callId = callId;
+ return this;
+ }
+
+ public Builder setSuccess(boolean success) {
+ this.success = success;
+ return this;
+ }
+
+ public Builder setSuccess() {
+ return setSuccess(true);
+ }
+
+ public Builder setException(RaftException exception) {
+ this.exception = exception;
+ return this;
+ }
+
+ public Builder setMessage(Message message) {
+ this.message = message;
+ return this;
+ }
+
+ public Builder setLogIndex(long logIndex) {
+ this.logIndex = logIndex;
+ return this;
+ }
+
+ public Builder setCommitInfos(Collection<CommitInfoProto> commitInfos) {
+ this.commitInfos = commitInfos;
+ return this;
+ }
+
+ public Builder setServerId(RaftGroupMemberId serverId) {
+ return setServerId(serverId.getPeerId())
+ .setGroupId(serverId.getGroupId());
+ }
+
+ public Builder setRequest(RaftClientRequest request) {
+ return setClientId(request.getClientId())
+ .setServerId(request.getServerId())
+ .setGroupId(request.getRaftGroupId())
+ .setCallId(request.getCallId());
+ }
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
private final boolean success;
/**
@@ -58,16 +145,7 @@ public class RaftClientReply extends RaftClientMessage {
private final Collection<CommitInfoProto> commitInfos;
@SuppressWarnings("parameternumber")
- public RaftClientReply(ClientId clientId, RaftGroupMemberId serverId,
- long callId, boolean success, Message message, RaftException exception,
- long logIndex, Collection<CommitInfoProto> commitInfos) {
- this(clientId, serverId.getPeerId(), serverId.getGroupId(),
- callId, success, message, exception, logIndex, commitInfos);
- }
-
- @SuppressWarnings("parameternumber")
- public RaftClientReply(
- ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+ RaftClientReply(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
long callId, boolean success, Message message, RaftException exception,
long logIndex, Collection<CommitInfoProto> commitInfos) {
super(clientId, serverId, groupId, callId);
@@ -88,26 +166,6 @@ public class RaftClientReply extends RaftClientMessage {
}
}
- public RaftClientReply(RaftClientRequest request, RaftException exception,
Collection<CommitInfoProto> commitInfos) {
- this(request.getClientId(), request.getServerId(),
request.getRaftGroupId(),
- request.getCallId(), false, null, exception, 0L, commitInfos);
- }
-
- public RaftClientReply(RaftClientRequest request,
Collection<CommitInfoProto> commitInfos) {
- this(request, (Message) null, commitInfos);
- }
-
- public RaftClientReply(RaftClientRequest request, Message message,
Collection<CommitInfoProto> commitInfos) {
- this(request.getClientId(), request.getServerId(),
request.getRaftGroupId(),
- request.getCallId(), true, message, null, 0L, commitInfos);
- }
-
- public RaftClientReply(RaftClientRequest request, NotReplicatedException nre,
- Collection<CommitInfoProto> commitInfos) {
- this(request.getClientId(), request.getServerId(),
request.getRaftGroupId(),
- request.getCallId(), false, request.getMessage(), nre,
nre.getLogIndex(), commitInfos);
- }
-
/**
* Get the commit information for the entire group.
* The commit information may be unavailable for exception reply.
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index 4b57e7f..ce56a40 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -97,6 +97,12 @@ public interface Preconditions {
+ name + " = " + object + " == null, class = " + object.getClass());
}
+ static <T> T assertInstanceOf(Object object, Class<T> clazz) {
+ assertTrue(clazz.isInstance(object),
+ () -> "Required instance of " + clazz + " but object.getClass() is " +
object.getClass());
+ return clazz.cast(object);
+ }
+
static <T> void assertUnique(Iterable<T> first) {
assertUnique(first, Collections.emptyList());
}
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index b71e50a..8248196 100644
---
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -61,8 +61,11 @@ public class GrpcClientProtocolService extends
RaftClientProtocolServiceImplBase
@Override
public void fail(Throwable t) {
- Preconditions.assertTrue(t instanceof RaftException, () -> "Requires
RaftException but " + t);
- setReply(new RaftClientReply(request, (RaftException) t, null));
+ final RaftException e = Preconditions.assertInstanceOf(t,
RaftException.class);
+ setReply(RaftClientReply.newBuilder()
+ .setRequest(request)
+ .setException(e)
+ .build());
}
@Override
diff --git
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 11923b0..81e4f68 100644
---
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -331,15 +331,20 @@ public class DataStreamManagement {
static void replyDataStreamException(RaftServer server, Throwable cause,
RaftClientRequest raftClientRequest,
DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
- DataStreamException dataStreamException = new
DataStreamException(server.getId(), cause);
- RaftClientReply reply = new RaftClientReply(raftClientRequest,
dataStreamException, null);
+ final RaftClientReply reply = RaftClientReply.newBuilder()
+ .setRequest(raftClientRequest)
+ .setException(new DataStreamException(server.getId(), cause))
+ .build();
sendDataStreamException(cause, request, reply, ctx);
}
void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf
request, ChannelHandlerContext ctx) {
- DataStreamException dataStreamException = new
DataStreamException(server.getId(), cause);
- RaftClientReply reply = new RaftClientReply(ClientId.emptyClientId(),
server.getId(), RaftGroupId.emptyGroupId(),
- -1, false, null, dataStreamException, 0L, null);
+ final RaftClientReply reply = RaftClientReply.newBuilder()
+ .setClientId(ClientId.emptyClientId())
+ .setServerId(server.getId())
+ .setGroupId(RaftGroupId.emptyGroupId())
+ .setException(new DataStreamException(server.getId(), cause))
+ .build();
sendDataStreamException(cause, request, reply, ctx);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index fa591cc..e30d480 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -360,7 +360,7 @@ public class LeaderState {
CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
return messageStreamRequests.streamAsync(request)
- .thenApply(dummy -> new RaftClientReply(request,
server.getCommitInfos()))
+ .thenApply(dummy -> server.newSuccessReply(request))
.exceptionally(e -> exception2RaftClientReply(request, e));
}
@@ -372,18 +372,22 @@ public class LeaderState {
CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest
request) {
LOG.debug("{}: addWatchRequest {}", this, request);
return watchRequests.add(request)
- .thenApply(v -> new RaftClientReply(request, server.getCommitInfos()))
+ .thenApply(v -> server.newSuccessReply(request))
.exceptionally(e -> exception2RaftClientReply(request, e));
}
private RaftClientReply exception2RaftClientReply(RaftClientRequest request,
Throwable e) {
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof NotReplicatedException) {
- return new RaftClientReply(request, (NotReplicatedException)e,
server.getCommitInfos());
+ final NotReplicatedException nre = (NotReplicatedException)e;
+ return server.newReplyBuilder(request)
+ .setException(nre)
+ .setLogIndex(nre.getLogIndex())
+ .build();
} else if (e instanceof NotLeaderException) {
- return new RaftClientReply(request, (NotLeaderException)e,
server.getCommitInfos());
+ return server.newExceptionReply(request, (NotLeaderException)e);
} else if (e instanceof LeaderNotReadyException) {
- return new RaftClientReply(request, (LeaderNotReadyException)e,
server.getCommitInfos());
+ return server.newExceptionReply(request, (LeaderNotReadyException)e);
} else {
throw new CompletionException(e);
}
@@ -731,7 +735,7 @@ public class LeaderState {
if (conf.isTransitional()) {
replicateNewConf();
} else { // the (new) log entry has been committed
- pendingRequests.replySetConfiguration(server::getCommitInfos);
+ pendingRequests.replySetConfiguration(server::newSuccessReply);
// if the leader is not included in the current configuration, step
down
if (!conf.containsInConf(server.getId())) {
LOG.info("{} is not included in the new configuration {}. Will
shutdown server...", this, conf);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index bb9d058..260dd0f 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -41,7 +42,7 @@ public class PendingRequest implements
Comparable<PendingRequest> {
}
PendingRequest(SetConfigurationRequest request) {
- this(RaftServerConstants.INVALID_LOG_INDEX, request, null);
+ this(RaftLog.INVALID_LOG_INDEX, request, null);
}
long getIndex() {
@@ -74,7 +75,11 @@ public class PendingRequest implements
Comparable<PendingRequest> {
}
TransactionContext setNotLeaderException(NotLeaderException nle,
Collection<CommitInfoProto> commitInfos) {
- setReply(new RaftClientReply(getRequest(), nle, commitInfos));
+ setReply(RaftClientReply.newBuilder()
+ .setRequest(getRequest())
+ .setException(nle)
+ .setCommitInfos(commitInfos)
+ .build());
return getEntry();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 03e54a7..9892d06 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -44,7 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.function.Supplier;
+import java.util.function.Function;
class PendingRequests {
public static final Logger LOG =
LoggerFactory.getLogger(PendingRequests.class);
@@ -206,7 +206,7 @@ class PendingRequests {
return pendingSetConf;
}
- void replySetConfiguration(Supplier<Collection<CommitInfoProto>>
getCommitInfos) {
+ void replySetConfiguration(Function<RaftClientRequest, RaftClientReply>
newSuccessReply) {
// we allow the pendingRequest to be null in case that the new leader
// commits the new configuration while it has not received the retry
// request from the client
@@ -215,7 +215,7 @@ class PendingRequests {
LOG.debug("{}: sends success for {}", name, request);
// for setConfiguration we do not need to wait for statemachine. send
back
// reply after it's committed.
- pendingSetConf.setReply(new RaftClientReply(request,
getCommitInfos.get()));
+ pendingSetConf.setReply(newSuccessReply.apply(request));
pendingSetConf = null;
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2cfb4d3..3bb58f3 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -451,8 +451,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
GroupInfoReply getGroupInfo(GroupInfoRequest request) {
- return new GroupInfoReply(request, getRoleInfoProto(),
- state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(),
getGroup());
+ return new GroupInfoReply(request, getCommitInfos(),
+ getGroup(), getRoleInfoProto(),
state.getStorage().getStorageDir().hasMetaFile());
}
RoleInfoProto getRoleInfoProto() {
@@ -511,6 +511,33 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
return role + " " + state + " " + lifeCycle.getCurrentState();
}
+ RaftClientReply.Builder newReplyBuilder(RaftClientRequest request) {
+ return RaftClientReply.newBuilder()
+ .setRequest(request)
+ .setCommitInfos(getCommitInfos());
+ }
+
+ private RaftClientReply.Builder newReplyBuilder(ClientId clientId, long
callId, long logIndex) {
+ return RaftClientReply.newBuilder()
+ .setClientId(clientId)
+ .setCallId(callId)
+ .setLogIndex(logIndex)
+ .setServerId(getMemberId())
+ .setCommitInfos(getCommitInfos());
+ }
+
+ RaftClientReply newSuccessReply(RaftClientRequest request) {
+ return newReplyBuilder(request)
+ .setSuccess()
+ .build();
+ }
+
+ RaftClientReply newExceptionReply(RaftClientRequest request, RaftException
exception) {
+ return newReplyBuilder(request)
+ .setException(exception)
+ .build();
+ }
+
/**
* @return null if the server is in leader state.
*/
@@ -524,7 +551,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
if (!isLeader()) {
NotLeaderException exception = generateNotLeaderException();
- final RaftClientReply reply = new RaftClientReply(request, exception,
getCommitInfos());
+ final RaftClientReply reply = newExceptionReply(request, exception);
return RetryCache.failWithReply(reply, entry);
}
final LeaderState leaderState = role.getLeaderState().orElse(null);
@@ -534,7 +561,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
return cacheEntry.getReplyFuture();
}
final LeaderNotReadyException lnre = new
LeaderNotReadyException(getMemberId());
- final RaftClientReply reply = new RaftClientReply(request, lnre,
getCommitInfos());
+ final RaftClientReply reply = newExceptionReply(request, lnre);
return RetryCache.failWithReply(reply, entry);
}
return null;
@@ -612,7 +639,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
} catch (StateMachineException e) {
// the StateMachineException is thrown by the SM in the preAppend
stage.
// Return the exception in a RaftClientReply.
- RaftClientReply exceptionReply = new RaftClientReply(request, e,
getCommitInfos());
+ RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
// leader will step down here
if (isLeader()) {
@@ -695,8 +722,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// the state machine. We should call cancelTransaction() for failed
requests
TransactionContext context = stateMachine.startTransaction(request);
if (context.getException() != null) {
- RaftClientReply exceptionReply = new RaftClientReply(request,
- new StateMachineException(getMemberId(),
context.getException()), getCommitInfos());
+ final StateMachineException e = new
StateMachineException(getMemberId(), context.getException());
+ final RaftClientReply exceptionReply = newExceptionReply(request,
e);
cacheEntry.failWithReply(exceptionReply);
replyFuture = CompletableFuture.completedFuture(exceptionReply);
} else {
@@ -736,7 +763,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
return role.getLeaderState()
.map(ls -> ls.addWatchReqeust(request))
.orElseGet(() -> CompletableFuture.completedFuture(
- new RaftClientReply(request, generateNotLeaderException(),
getCommitInfos())));
+ newExceptionReply(request, generateNotLeaderException())));
}
private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest
request) {
@@ -747,7 +774,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final StaleReadException e = new StaleReadException(
"Unable to serve stale-read due to server commit index = " +
commitIndex + " < min = " + minIndex);
return CompletableFuture.completedFuture(
- new RaftClientReply(request, new
StateMachineException(getMemberId(), e), getCommitInfos()));
+ newExceptionReply(request, new StateMachineException(getMemberId(),
e)));
}
return processQueryFuture(stateMachine.queryStale(request.getMessage(),
minIndex), request);
}
@@ -756,7 +783,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
return role.getLeaderState()
.map(ls -> ls.streamAsync(request))
.orElseGet(() -> CompletableFuture.completedFuture(
- new RaftClientReply(request, generateNotLeaderException(),
getCommitInfos())));
+ newExceptionReply(request, generateNotLeaderException())));
}
private CompletableFuture<RaftClientRequest>
streamEndOfRequestAsync(RaftClientRequest request) {
@@ -767,11 +794,11 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
CompletableFuture<RaftClientReply> processQueryFuture(
CompletableFuture<Message> queryFuture, RaftClientRequest request) {
- return queryFuture.thenApply(r -> new RaftClientReply(request, r,
getCommitInfos()))
+ return queryFuture.thenApply(r ->
newReplyBuilder(request).setSuccess().setMessage(r).build())
.exceptionally(e -> {
e = JavaUtils.unwrapCompletionException(e);
if (e instanceof StateMachineException) {
- return new RaftClientReply(request, (StateMachineException)e,
getCommitInfos());
+ return newExceptionReply(request, (StateMachineException)e);
}
throw new CompletionException(e);
});
@@ -785,7 +812,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
RaftClientReply waitForReply(RaftClientRequest request,
CompletableFuture<RaftClientReply> future)
throws IOException {
- return waitForReply(getMemberId(), request, future, e -> new
RaftClientReply(request, e, getCommitInfos()));
+ return waitForReply(getMemberId(), request, future, e ->
newExceptionReply(request, e));
}
static <REPLY extends RaftClientReply> REPLY waitForReply(
@@ -853,7 +880,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// return success with a null message if the new conf is the same as the
current
if (current.hasNoChange(peersInNewConf)) {
pending = new PendingRequest(request);
- pending.setReply(new RaftClientReply(request, getCommitInfos()));
+ pending.setReply(newSuccessReply(request));
return pending.getFuture();
}
@@ -1487,14 +1514,15 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
final long logIndex = logEntry.getIndex();
return stateMachineFuture.whenComplete((reply, exception) -> {
+ final RaftClientReply.Builder b = newReplyBuilder(clientId, callId,
logIndex);
final RaftClientReply r;
if (exception == null) {
- r = new RaftClientReply(clientId, getMemberId(), callId, true, reply,
null, logIndex, getCommitInfos());
+ r = b.setSuccess().setMessage(reply).build();
} else {
// the exception is coming from the state machine. wrap it into the
// reply as a StateMachineException
final StateMachineException e = new
StateMachineException(getMemberId(), exception);
- r = new RaftClientReply(clientId, getMemberId(), callId, false, null,
e, logIndex, getCommitInfos());
+ r = b.setException(e).build();
}
// update pending request
@@ -1563,10 +1591,9 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
final long callId = smLog.getCallId();
final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId,
callId);
if (cacheEntry != null) {
- final RaftClientReply reply = new RaftClientReply(clientId,
getMemberId(),
- callId, false, null, generateNotLeaderException(),
- logEntry.getIndex(), getCommitInfos());
- cacheEntry.failWithReply(reply);
+ cacheEntry.failWithReply(newReplyBuilder(clientId, callId,
logEntry.getIndex())
+ .setException(generateNotLeaderException())
+ .build());
}
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 9364811..d4b611c 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -379,7 +379,10 @@ public class RaftServerProxy implements RaftServer {
@Override
public RaftClientReply groupManagement(GroupManagementRequest request)
throws IOException {
return RaftServerImpl.waitForReply(getId(), request,
groupManagementAsync(request),
- e -> new RaftClientReply(request, e, null));
+ e -> RaftClientReply.newBuilder()
+ .setRequest(request)
+ .setException(e)
+ .build());
}
@Override
@@ -412,7 +415,7 @@ public class RaftServerProxy implements RaftServer {
LOG.debug("{}: newImpl = {}", getId(), newImpl);
final boolean started = newImpl.start();
Preconditions.assertTrue(started, () -> getId()+ ": failed to start
a new impl: " + newImpl);
- return new RaftClientReply(request, newImpl.getCommitInfos());
+ return newImpl.newSuccessReply(request);
}, implExecutor)
.whenComplete((raftClientReply, throwable) -> {
if (throwable != null) {
@@ -443,7 +446,7 @@ public class RaftServerProxy implements RaftServer {
}
return f.thenApply(impl -> {
impl.groupRemove(deleteDirectory, renameDirectory);
- return new RaftClientReply(request, impl.getCommitInfos());
+ return impl.newSuccessReply(request);
});
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index d78fbdc..2fd669c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -85,6 +85,7 @@ public abstract class RaftAsyncTests<CLUSTER extends
MiniRaftCluster> extends Ba
public void testAsyncConfiguration() throws IOException {
LOG.info("Running testAsyncConfiguration");
final RaftProperties properties = new RaftProperties();
+ RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(properties,
false);
RaftClient.Builder clientBuilder = RaftClient.newBuilder()
.setRaftGroup(RaftGroup.emptyGroup())
.setProperties(properties);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 837e368..f075048 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -262,8 +262,11 @@ abstract class DataStreamBaseTest extends BaseTest {
final MultiDataStreamStateMachine stateMachine =
getStateMachine(request.getRaftGroupId());
final SingleDataStream stream =
stateMachine.getSingleDataStream(request.getCallId());
Assert.assertFalse(stream.getWritableByteChannel().isOpen());
- return CompletableFuture.completedFuture(new RaftClientReply(request,
- () -> bytesWritten2ByteString(stream.getByteWritten()), null));
+ return CompletableFuture.completedFuture(RaftClientReply.newBuilder()
+ .setRequest(request)
+ .setSuccess()
+ .setMessage(() -> bytesWritten2ByteString(stream.getByteWritten()))
+ .build());
}
@Override
diff --git
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index bd9e6f0..dd035f1 100644
---
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -111,12 +111,13 @@ public class TestDataStreamNetty extends
DataStreamBaseTest {
final RaftClientRequest r = (RaftClientRequest)
invocation.getArguments()[0];
final RaftClientReply reply;
if (isLeader) {
- reply = leaderException != null? new RaftClientReply(r,
leaderException, null)
- : new RaftClientReply(r, () -> MOCK, null);
+ final RaftClientReply.Builder b =
RaftClientReply.newBuilder().setRequest(r);
+ reply = leaderException != null?
b.setException(leaderException).build()
+ : b.setSuccess().setMessage(() -> MOCK).build();
} else {
final RaftGroupMemberId memberId =
RaftGroupMemberId.valueOf(peerId, groupId);
final NotLeaderException notLeaderException = new
NotLeaderException(memberId, suggestedLeader, null);
- reply = new RaftClientReply(r, notLeaderException, null);
+ reply =
RaftClientReply.newBuilder().setRequest(r).setException(notLeaderException).build();
}
return CompletableFuture.completedFuture(reply);
});