This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f49943  RATIS-610. Add a builder for RaftClientReply. (#283)
6f49943 is described below

commit 6f49943a5f8b169f090fb5f6c6f3b8cbeae344a3
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Nov 18 11:46:04 2020 +0800

    RATIS-610. Add a builder for RaftClientReply. (#283)
    
    * RATIS-610. Add a builder for RaftClientReply.
    
    * Fix testAsyncConfiguration
---
 .../apache/ratis/client/impl/ClientProtoUtils.java |  52 +++++----
 .../org/apache/ratis/protocol/GroupInfoReply.java  |  24 ++---
 .../org/apache/ratis/protocol/GroupListReply.java  |  20 ++--
 .../apache/ratis/protocol/RaftClientMessage.java   |   7 +-
 .../org/apache/ratis/protocol/RaftClientReply.java | 118 +++++++++++++++------
 .../java/org/apache/ratis/util/Preconditions.java  |   6 ++
 .../grpc/client/GrpcClientProtocolService.java     |   7 +-
 .../ratis/netty/server/DataStreamManagement.java   |  15 ++-
 .../org/apache/ratis/server/impl/LeaderState.java  |  16 +--
 .../apache/ratis/server/impl/PendingRequest.java   |   9 +-
 .../apache/ratis/server/impl/PendingRequests.java  |   6 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  67 ++++++++----
 .../apache/ratis/server/impl/RaftServerProxy.java  |   9 +-
 .../test/java/org/apache/ratis/RaftAsyncTests.java |   1 +
 .../ratis/datastream/DataStreamBaseTest.java       |   7 +-
 .../ratis/datastream/TestDataStreamNetty.java      |   7 +-
 16 files changed, 244 insertions(+), 127 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 94c3d13..4498cff 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -284,35 +284,41 @@ public interface ClientProtoUtils {
     } else {
       e = null;
     }
-    ClientId clientId = ClientId.valueOf(rp.getRequestorId());
-    return new RaftClientReply(clientId, serverMemberId, rp.getCallId(), 
rp.getSuccess(),
-        toMessage(replyProto.getMessage()), e,
-        replyProto.getLogIndex(), replyProto.getCommitInfosList());
+
+    return RaftClientReply.newBuilder()
+        .setClientId(ClientId.valueOf(rp.getRequestorId()))
+        .setServerId(serverMemberId)
+        .setCallId(rp.getCallId())
+        .setSuccess(rp.getSuccess())
+        .setMessage(toMessage(replyProto.getMessage()))
+        .setException(e)
+        .setLogIndex(replyProto.getLogIndex())
+        .setCommitInfos(replyProto.getCommitInfosList())
+        .build();
   }
 
-  static GroupListReply toGroupListReply(
-      GroupListReplyProto replyProto) {
-    final RaftRpcReplyProto rp = replyProto.getRpcReply();
-    ClientId clientId = ClientId.valueOf(rp.getRequestorId());
-    final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
-    final List<RaftGroupId> groupInfos = replyProto.getGroupIdList().stream()
+  static GroupListReply toGroupListReply(GroupListReplyProto replyProto) {
+    final RaftRpcReplyProto rpc = replyProto.getRpcReply();
+    final List<RaftGroupId> groupIds = replyProto.getGroupIdList().stream()
         .map(ProtoUtils::toRaftGroupId)
         .collect(Collectors.toList());
-    return new GroupListReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
-        groupId, rp.getCallId(), rp.getSuccess(), groupInfos);
+    return new GroupListReply(ClientId.valueOf(rpc.getRequestorId()),
+        RaftPeerId.valueOf(rpc.getReplyId()),
+        ProtoUtils.toRaftGroupId(rpc.getRaftGroupId()),
+        rpc.getCallId(),
+        groupIds);
   }
 
-  static GroupInfoReply toGroupInfoReply(
-      GroupInfoReplyProto replyProto) {
-    final RaftRpcReplyProto rp = replyProto.getRpcReply();
-    ClientId clientId = ClientId.valueOf(rp.getRequestorId());
-    final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
-    final RaftGroup raftGroup = ProtoUtils.toRaftGroup(replyProto.getGroup());
-    RoleInfoProto role = replyProto.getRole();
-    boolean isRaftStorageHealthy = replyProto.getIsRaftStorageHealthy();
-    return new GroupInfoReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
-        groupId, rp.getCallId(), rp.getSuccess(), role, isRaftStorageHealthy,
-        replyProto.getCommitInfosList(), raftGroup);
+  static GroupInfoReply toGroupInfoReply(GroupInfoReplyProto replyProto) {
+    final RaftRpcReplyProto rpc = replyProto.getRpcReply();
+    return new GroupInfoReply(ClientId.valueOf(rpc.getRequestorId()),
+        RaftPeerId.valueOf(rpc.getReplyId()),
+        ProtoUtils.toRaftGroupId(rpc.getRaftGroupId()),
+        rpc.getCallId(),
+        replyProto.getCommitInfosList(),
+        ProtoUtils.toRaftGroup(replyProto.getGroup()),
+        replyProto.getRole(),
+        replyProto.getIsRaftStorageHealthy());
   }
 
   static Message toMessage(final ClientMessageEntryProto p) {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
index 5cdb9fe..946bf23 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoReply.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -31,24 +31,20 @@ public class GroupInfoReply extends RaftClientReply {
   private final RoleInfoProto roleInfoProto;
   private final boolean isRaftStorageHealthy;
 
-  public GroupInfoReply(
-          RaftClientRequest request, RoleInfoProto roleInfoProto,
-          boolean isRaftStorageHealthy, Collection<CommitInfoProto> 
commitInfos, RaftGroup group) {
-    super(request, commitInfos);
-    this.roleInfoProto = roleInfoProto;
-    this.isRaftStorageHealthy = isRaftStorageHealthy;
-    this.group = group;
+  public GroupInfoReply(RaftClientRequest request, Collection<CommitInfoProto> 
commitInfos,
+      RaftGroup group, RoleInfoProto roleInfoProto, boolean 
isRaftStorageHealthy) {
+    this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(), request.getCallId(), commitInfos,
+        group, roleInfoProto, isRaftStorageHealthy);
   }
 
   @SuppressWarnings("parameternumber")
-  public GroupInfoReply(
-          ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
-          long callId, boolean success, RoleInfoProto roleInfoProto,
-          boolean isRaftStorageHealthy, Collection<CommitInfoProto> 
commitInfos, RaftGroup group) {
-    super(clientId, serverId, groupId, callId, success, null, null, 0L, 
commitInfos);
+  public GroupInfoReply(ClientId clientId, RaftPeerId serverId, RaftGroupId 
groupId, long callId,
+      Collection<CommitInfoProto> commitInfos,
+      RaftGroup group, RoleInfoProto roleInfoProto, boolean 
isRaftStorageHealthy) {
+    super(clientId, serverId, groupId, callId, true, null, null, 0L, 
commitInfos);
+    this.group = group;
     this.roleInfoProto = roleInfoProto;
     this.isRaftStorageHealthy = isRaftStorageHealthy;
-    this.group = group;
   }
 
   public RaftGroup getGroup() {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java
index 5a9b000..bdc13f2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListReply.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,26 +17,24 @@
  */
 package org.apache.ratis.protocol;
 
+import java.util.Collections;
 import java.util.List;
 
 /**
- * The response of server information request. Sent from server to client.
+ * The response of group list request. Sent from server to client.
  */
 public class GroupListReply extends RaftClientReply {
 
   private final List<RaftGroupId> groupIds;
 
-  public GroupListReply(
-      RaftClientRequest request,  List<RaftGroupId> groupIds) {
-    super(request, null);
-    this.groupIds = groupIds;
+  public GroupListReply(RaftClientRequest request,  List<RaftGroupId> 
groupIds) {
+    this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(), request.getCallId(), groupIds);
   }
 
-  public GroupListReply(
-      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
-      long callId, boolean success, List<RaftGroupId> groupIds) {
-    super(clientId, serverId, groupId, callId, success, null, null, 0L, null);
-    this.groupIds = groupIds;
+  public GroupListReply(ClientId clientId, RaftPeerId serverId, RaftGroupId 
groupId, long callId,
+      List<RaftGroupId> groupIds) {
+    super(clientId, serverId, groupId, callId, true, null, null, 0L, null);
+    this.groupIds = Collections.unmodifiableList(groupIds);
   }
 
   public List<RaftGroupId> getGroupIds() {
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
index 3dfe6d3..8d3104a 100644
--- 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
+++ 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.protocol;
 
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
 
 public abstract class RaftClientMessage implements RaftRpcMessage {
   private final ClientId clientId;
@@ -26,9 +27,9 @@ public abstract class RaftClientMessage implements 
RaftRpcMessage {
   private final long callId;
 
   RaftClientMessage(ClientId clientId, RaftPeerId serverId, RaftGroupId 
groupId, long callId) {
-    this.clientId = clientId;
-    this.serverId = serverId;
-    this.groupId = groupId;
+    this.clientId = Preconditions.assertNotNull(clientId, "clientId");
+    this.serverId = Preconditions.assertNotNull(serverId, "serverId");
+    this.groupId = Preconditions.assertNotNull(groupId, "groupId");
     this.callId = callId;
   }
 
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java 
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 7973e0c..a45bd45 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -37,6 +37,93 @@ import java.util.Collections;
  * Reply from server to client
  */
 public class RaftClientReply extends RaftClientMessage {
+  /**
+   * To build {@link RaftClientReply}
+   */
+  public static class Builder {
+    private ClientId clientId;
+    private RaftPeerId serverId;
+    private RaftGroupId groupId;
+    private long callId;
+
+    private boolean success;
+    private Message message;
+    private RaftException exception;
+
+    private long logIndex;
+    private Collection<CommitInfoProto> commitInfos;
+
+    public RaftClientReply build() {
+      return new RaftClientReply(clientId, serverId, groupId, callId,
+          success, message, exception, logIndex, commitInfos);
+    }
+
+    public Builder setClientId(ClientId clientId) {
+      this.clientId = clientId;
+      return this;
+    }
+
+    public Builder setServerId(RaftPeerId serverId) {
+      this.serverId = serverId;
+      return this;
+    }
+
+    public Builder setGroupId(RaftGroupId groupId) {
+      this.groupId = groupId;
+      return this;
+    }
+
+    public Builder setCallId(long callId) {
+      this.callId = callId;
+      return this;
+    }
+
+    public Builder setSuccess(boolean success) {
+      this.success = success;
+      return this;
+    }
+
+    public Builder setSuccess() {
+      return setSuccess(true);
+    }
+
+    public Builder setException(RaftException exception) {
+      this.exception = exception;
+      return this;
+    }
+
+    public Builder setMessage(Message message) {
+      this.message = message;
+      return this;
+    }
+
+    public Builder setLogIndex(long logIndex) {
+      this.logIndex = logIndex;
+      return this;
+    }
+
+    public Builder setCommitInfos(Collection<CommitInfoProto> commitInfos) {
+      this.commitInfos = commitInfos;
+      return this;
+    }
+
+    public Builder setServerId(RaftGroupMemberId serverId) {
+      return setServerId(serverId.getPeerId())
+          .setGroupId(serverId.getGroupId());
+    }
+
+    public Builder setRequest(RaftClientRequest request) {
+      return setClientId(request.getClientId())
+          .setServerId(request.getServerId())
+          .setGroupId(request.getRaftGroupId())
+          .setCallId(request.getCallId());
+    }
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
   private final boolean success;
 
   /**
@@ -58,16 +145,7 @@ public class RaftClientReply extends RaftClientMessage {
   private final Collection<CommitInfoProto> commitInfos;
 
   @SuppressWarnings("parameternumber")
-  public RaftClientReply(ClientId clientId, RaftGroupMemberId serverId,
-      long callId, boolean success, Message message, RaftException exception,
-      long logIndex, Collection<CommitInfoProto> commitInfos) {
-    this(clientId, serverId.getPeerId(), serverId.getGroupId(),
-        callId, success, message, exception, logIndex, commitInfos);
-  }
-
-  @SuppressWarnings("parameternumber")
-  public RaftClientReply(
-      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+  RaftClientReply(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, boolean success, Message message, RaftException exception,
       long logIndex, Collection<CommitInfoProto> commitInfos) {
     super(clientId, serverId, groupId, callId);
@@ -88,26 +166,6 @@ public class RaftClientReply extends RaftClientMessage {
     }
   }
 
-  public RaftClientReply(RaftClientRequest request, RaftException exception, 
Collection<CommitInfoProto> commitInfos) {
-    this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
-        request.getCallId(), false, null, exception, 0L, commitInfos);
-  }
-
-  public RaftClientReply(RaftClientRequest request, 
Collection<CommitInfoProto> commitInfos) {
-    this(request, (Message) null, commitInfos);
-  }
-
-  public RaftClientReply(RaftClientRequest request, Message message, 
Collection<CommitInfoProto> commitInfos) {
-    this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
-        request.getCallId(), true, message, null, 0L, commitInfos);
-  }
-
-  public RaftClientReply(RaftClientRequest request, NotReplicatedException nre,
-      Collection<CommitInfoProto> commitInfos) {
-    this(request.getClientId(), request.getServerId(), 
request.getRaftGroupId(),
-        request.getCallId(), false, request.getMessage(), nre, 
nre.getLogIndex(), commitInfos);
-  }
-
   /**
    * Get the commit information for the entire group.
    * The commit information may be unavailable for exception reply.
diff --git 
a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java 
b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
index 4b57e7f..ce56a40 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/Preconditions.java
@@ -97,6 +97,12 @@ public interface Preconditions {
         + name + " = " + object + " == null, class = " + object.getClass());
   }
 
+  static <T> T assertInstanceOf(Object object, Class<T> clazz) {
+    assertTrue(clazz.isInstance(object),
+        () -> "Required instance of " + clazz + " but object.getClass() is " + 
object.getClass());
+    return clazz.cast(object);
+  }
+
   static <T> void assertUnique(Iterable<T> first) {
     assertUnique(first, Collections.emptyList());
   }
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index b71e50a..8248196 100644
--- 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -61,8 +61,11 @@ public class GrpcClientProtocolService extends 
RaftClientProtocolServiceImplBase
 
     @Override
     public void fail(Throwable t) {
-      Preconditions.assertTrue(t instanceof RaftException, () -> "Requires 
RaftException but " + t);
-      setReply(new RaftClientReply(request, (RaftException) t, null));
+      final RaftException e = Preconditions.assertInstanceOf(t, 
RaftException.class);
+      setReply(RaftClientReply.newBuilder()
+          .setRequest(request)
+          .setException(e)
+          .build());
     }
 
     @Override
diff --git 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 11923b0..81e4f68 100644
--- 
a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ 
b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -331,15 +331,20 @@ public class DataStreamManagement {
 
   static void replyDataStreamException(RaftServer server, Throwable cause, 
RaftClientRequest raftClientRequest,
       DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
-    DataStreamException dataStreamException = new 
DataStreamException(server.getId(), cause);
-    RaftClientReply reply = new RaftClientReply(raftClientRequest, 
dataStreamException, null);
+    final RaftClientReply reply = RaftClientReply.newBuilder()
+        .setRequest(raftClientRequest)
+        .setException(new DataStreamException(server.getId(), cause))
+        .build();
     sendDataStreamException(cause, request, reply, ctx);
   }
 
   void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf 
request, ChannelHandlerContext ctx) {
-    DataStreamException dataStreamException = new 
DataStreamException(server.getId(), cause);
-    RaftClientReply reply = new RaftClientReply(ClientId.emptyClientId(), 
server.getId(), RaftGroupId.emptyGroupId(),
-        -1, false, null, dataStreamException, 0L, null);
+    final RaftClientReply reply = RaftClientReply.newBuilder()
+        .setClientId(ClientId.emptyClientId())
+        .setServerId(server.getId())
+        .setGroupId(RaftGroupId.emptyGroupId())
+        .setException(new DataStreamException(server.getId(), cause))
+        .build();
     sendDataStreamException(cause, request, reply, ctx);
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index fa591cc..e30d480 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -360,7 +360,7 @@ public class LeaderState {
 
   CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
     return messageStreamRequests.streamAsync(request)
-        .thenApply(dummy -> new RaftClientReply(request, 
server.getCommitInfos()))
+        .thenApply(dummy -> server.newSuccessReply(request))
         .exceptionally(e -> exception2RaftClientReply(request, e));
   }
 
@@ -372,18 +372,22 @@ public class LeaderState {
   CompletableFuture<RaftClientReply> addWatchReqeust(RaftClientRequest 
request) {
     LOG.debug("{}: addWatchRequest {}", this, request);
     return watchRequests.add(request)
-        .thenApply(v -> new RaftClientReply(request, server.getCommitInfos()))
+        .thenApply(v -> server.newSuccessReply(request))
         .exceptionally(e -> exception2RaftClientReply(request, e));
   }
 
   private RaftClientReply exception2RaftClientReply(RaftClientRequest request, 
Throwable e) {
     e = JavaUtils.unwrapCompletionException(e);
     if (e instanceof NotReplicatedException) {
-      return new RaftClientReply(request, (NotReplicatedException)e, 
server.getCommitInfos());
+      final NotReplicatedException nre = (NotReplicatedException)e;
+      return server.newReplyBuilder(request)
+          .setException(nre)
+          .setLogIndex(nre.getLogIndex())
+          .build();
     } else if (e instanceof NotLeaderException) {
-      return new RaftClientReply(request, (NotLeaderException)e, 
server.getCommitInfos());
+      return server.newExceptionReply(request, (NotLeaderException)e);
     } else if (e instanceof LeaderNotReadyException) {
-      return new RaftClientReply(request, (LeaderNotReadyException)e, 
server.getCommitInfos());
+      return server.newExceptionReply(request, (LeaderNotReadyException)e);
     } else {
       throw new CompletionException(e);
     }
@@ -731,7 +735,7 @@ public class LeaderState {
       if (conf.isTransitional()) {
         replicateNewConf();
       } else { // the (new) log entry has been committed
-        pendingRequests.replySetConfiguration(server::getCommitInfos);
+        pendingRequests.replySetConfiguration(server::newSuccessReply);
         // if the leader is not included in the current configuration, step 
down
         if (!conf.containsInConf(server.getId())) {
           LOG.info("{} is not included in the new configuration {}. Will 
shutdown server...", this, conf);
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
index bb9d058..260dd0f 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequest.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.impl;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -41,7 +42,7 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
   }
 
   PendingRequest(SetConfigurationRequest request) {
-    this(RaftServerConstants.INVALID_LOG_INDEX, request, null);
+    this(RaftLog.INVALID_LOG_INDEX, request, null);
   }
 
   long getIndex() {
@@ -74,7 +75,11 @@ public class PendingRequest implements 
Comparable<PendingRequest> {
   }
 
   TransactionContext setNotLeaderException(NotLeaderException nle, 
Collection<CommitInfoProto> commitInfos) {
-    setReply(new RaftClientReply(getRequest(), nle, commitInfos));
+    setReply(RaftClientReply.newBuilder()
+        .setRequest(getRequest())
+        .setException(nle)
+        .setCommitInfos(commitInfos)
+        .build());
     return getEntry();
   }
 
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 03e54a7..9892d06 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -44,7 +44,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.function.Supplier;
+import java.util.function.Function;
 
 class PendingRequests {
   public static final Logger LOG = 
LoggerFactory.getLogger(PendingRequests.class);
@@ -206,7 +206,7 @@ class PendingRequests {
     return pendingSetConf;
   }
 
-  void replySetConfiguration(Supplier<Collection<CommitInfoProto>> 
getCommitInfos) {
+  void replySetConfiguration(Function<RaftClientRequest, RaftClientReply> 
newSuccessReply) {
     // we allow the pendingRequest to be null in case that the new leader
     // commits the new configuration while it has not received the retry
     // request from the client
@@ -215,7 +215,7 @@ class PendingRequests {
       LOG.debug("{}: sends success for {}", name, request);
       // for setConfiguration we do not need to wait for statemachine. send 
back
       // reply after it's committed.
-      pendingSetConf.setReply(new RaftClientReply(request, 
getCommitInfos.get()));
+      pendingSetConf.setReply(newSuccessReply.apply(request));
       pendingSetConf = null;
     }
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2cfb4d3..3bb58f3 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -451,8 +451,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   }
 
   GroupInfoReply getGroupInfo(GroupInfoRequest request) {
-    return new GroupInfoReply(request, getRoleInfoProto(),
-        state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), 
getGroup());
+    return new GroupInfoReply(request, getCommitInfos(),
+        getGroup(), getRoleInfoProto(), 
state.getStorage().getStorageDir().hasMetaFile());
   }
 
   RoleInfoProto getRoleInfoProto() {
@@ -511,6 +511,33 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     return role + " " + state + " " + lifeCycle.getCurrentState();
   }
 
+  RaftClientReply.Builder newReplyBuilder(RaftClientRequest request) {
+    return RaftClientReply.newBuilder()
+        .setRequest(request)
+        .setCommitInfos(getCommitInfos());
+  }
+
+  private RaftClientReply.Builder newReplyBuilder(ClientId clientId, long 
callId, long logIndex) {
+    return RaftClientReply.newBuilder()
+        .setClientId(clientId)
+        .setCallId(callId)
+        .setLogIndex(logIndex)
+        .setServerId(getMemberId())
+        .setCommitInfos(getCommitInfos());
+  }
+
+  RaftClientReply newSuccessReply(RaftClientRequest request) {
+    return newReplyBuilder(request)
+        .setSuccess()
+        .build();
+  }
+
+  RaftClientReply newExceptionReply(RaftClientRequest request, RaftException 
exception) {
+    return newReplyBuilder(request)
+        .setException(exception)
+        .build();
+  }
+
   /**
    * @return null if the server is in leader state.
    */
@@ -524,7 +551,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     if (!isLeader()) {
       NotLeaderException exception = generateNotLeaderException();
-      final RaftClientReply reply = new RaftClientReply(request, exception, 
getCommitInfos());
+      final RaftClientReply reply = newExceptionReply(request, exception);
       return RetryCache.failWithReply(reply, entry);
     }
     final LeaderState leaderState = role.getLeaderState().orElse(null);
@@ -534,7 +561,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         return cacheEntry.getReplyFuture();
       }
       final LeaderNotReadyException lnre = new 
LeaderNotReadyException(getMemberId());
-      final RaftClientReply reply = new RaftClientReply(request, lnre, 
getCommitInfos());
+      final RaftClientReply reply = newExceptionReply(request, lnre);
       return RetryCache.failWithReply(reply, entry);
     }
     return null;
@@ -612,7 +639,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       } catch (StateMachineException e) {
         // the StateMachineException is thrown by the SM in the preAppend 
stage.
         // Return the exception in a RaftClientReply.
-        RaftClientReply exceptionReply = new RaftClientReply(request, e, 
getCommitInfos());
+        RaftClientReply exceptionReply = newExceptionReply(request, e);
         cacheEntry.failWithReply(exceptionReply);
         // leader will step down here
         if (isLeader()) {
@@ -695,8 +722,8 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
           // the state machine. We should call cancelTransaction() for failed 
requests
           TransactionContext context = stateMachine.startTransaction(request);
           if (context.getException() != null) {
-            RaftClientReply exceptionReply = new RaftClientReply(request,
-                new StateMachineException(getMemberId(), 
context.getException()), getCommitInfos());
+            final StateMachineException e = new 
StateMachineException(getMemberId(), context.getException());
+            final RaftClientReply exceptionReply = newExceptionReply(request, 
e);
             cacheEntry.failWithReply(exceptionReply);
             replyFuture =  CompletableFuture.completedFuture(exceptionReply);
           } else {
@@ -736,7 +763,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     return role.getLeaderState()
         .map(ls -> ls.addWatchReqeust(request))
         .orElseGet(() -> CompletableFuture.completedFuture(
-            new RaftClientReply(request, generateNotLeaderException(), 
getCommitInfos())));
+            newExceptionReply(request, generateNotLeaderException())));
   }
 
   private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest 
request) {
@@ -747,7 +774,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       final StaleReadException e = new StaleReadException(
           "Unable to serve stale-read due to server commit index = " + 
commitIndex + " < min = " + minIndex);
       return CompletableFuture.completedFuture(
-          new RaftClientReply(request, new 
StateMachineException(getMemberId(), e), getCommitInfos()));
+          newExceptionReply(request, new StateMachineException(getMemberId(), 
e)));
     }
     return processQueryFuture(stateMachine.queryStale(request.getMessage(), 
minIndex), request);
   }
@@ -756,7 +783,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     return role.getLeaderState()
         .map(ls -> ls.streamAsync(request))
         .orElseGet(() -> CompletableFuture.completedFuture(
-            new RaftClientReply(request, generateNotLeaderException(), 
getCommitInfos())));
+            newExceptionReply(request, generateNotLeaderException())));
   }
 
   private CompletableFuture<RaftClientRequest> 
streamEndOfRequestAsync(RaftClientRequest request) {
@@ -767,11 +794,11 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
   CompletableFuture<RaftClientReply> processQueryFuture(
       CompletableFuture<Message> queryFuture, RaftClientRequest request) {
-    return queryFuture.thenApply(r -> new RaftClientReply(request, r, 
getCommitInfos()))
+    return queryFuture.thenApply(r -> 
newReplyBuilder(request).setSuccess().setMessage(r).build())
         .exceptionally(e -> {
           e = JavaUtils.unwrapCompletionException(e);
           if (e instanceof StateMachineException) {
-            return new RaftClientReply(request, (StateMachineException)e, 
getCommitInfos());
+            return newExceptionReply(request, (StateMachineException)e);
           }
           throw new CompletionException(e);
         });
@@ -785,7 +812,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
   RaftClientReply waitForReply(RaftClientRequest request, 
CompletableFuture<RaftClientReply> future)
       throws IOException {
-    return waitForReply(getMemberId(), request, future, e -> new 
RaftClientReply(request, e, getCommitInfos()));
+    return waitForReply(getMemberId(), request, future, e -> 
newExceptionReply(request, e));
   }
 
   static <REPLY extends RaftClientReply> REPLY waitForReply(
@@ -853,7 +880,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       // return success with a null message if the new conf is the same as the 
current
       if (current.hasNoChange(peersInNewConf)) {
         pending = new PendingRequest(request);
-        pending.setReply(new RaftClientReply(request, getCommitInfos()));
+        pending.setReply(newSuccessReply(request));
         return pending.getFuture();
       }
 
@@ -1487,14 +1514,15 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
 
     final long logIndex = logEntry.getIndex();
     return stateMachineFuture.whenComplete((reply, exception) -> {
+      final RaftClientReply.Builder b = newReplyBuilder(clientId, callId, 
logIndex);
       final RaftClientReply r;
       if (exception == null) {
-        r = new RaftClientReply(clientId, getMemberId(), callId, true, reply, 
null, logIndex, getCommitInfos());
+        r = b.setSuccess().setMessage(reply).build();
       } else {
         // the exception is coming from the state machine. wrap it into the
         // reply as a StateMachineException
         final StateMachineException e = new 
StateMachineException(getMemberId(), exception);
-        r = new RaftClientReply(clientId, getMemberId(), callId, false, null, 
e, logIndex, getCommitInfos());
+        r = b.setException(e).build();
       }
 
       // update pending request
@@ -1563,10 +1591,9 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
       final long callId = smLog.getCallId();
       final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId, 
callId);
       if (cacheEntry != null) {
-        final RaftClientReply reply = new RaftClientReply(clientId, 
getMemberId(),
-            callId, false, null, generateNotLeaderException(),
-            logEntry.getIndex(), getCommitInfos());
-        cacheEntry.failWithReply(reply);
+        cacheEntry.failWithReply(newReplyBuilder(clientId, callId, 
logEntry.getIndex())
+            .setException(generateNotLeaderException())
+            .build());
       }
     }
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 9364811..d4b611c 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -379,7 +379,10 @@ public class RaftServerProxy implements RaftServer {
   @Override
   public RaftClientReply groupManagement(GroupManagementRequest request) 
throws IOException {
     return RaftServerImpl.waitForReply(getId(), request, 
groupManagementAsync(request),
-        e -> new RaftClientReply(request, e, null));
+        e -> RaftClientReply.newBuilder()
+            .setRequest(request)
+            .setException(e)
+            .build());
   }
 
   @Override
@@ -412,7 +415,7 @@ public class RaftServerProxy implements RaftServer {
           LOG.debug("{}: newImpl = {}", getId(), newImpl);
           final boolean started = newImpl.start();
           Preconditions.assertTrue(started, () -> getId()+ ": failed to start 
a new impl: " + newImpl);
-          return new RaftClientReply(request, newImpl.getCommitInfos());
+          return newImpl.newSuccessReply(request);
         }, implExecutor)
         .whenComplete((raftClientReply, throwable) -> {
           if (throwable != null) {
@@ -443,7 +446,7 @@ public class RaftServerProxy implements RaftServer {
     }
     return f.thenApply(impl -> {
       impl.groupRemove(deleteDirectory, renameDirectory);
-      return new RaftClientReply(request, impl.getCommitInfos());
+      return impl.newSuccessReply(request);
     });
   }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index d78fbdc..2fd669c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -85,6 +85,7 @@ public abstract class RaftAsyncTests<CLUSTER extends 
MiniRaftCluster> extends Ba
   public void testAsyncConfiguration() throws IOException {
     LOG.info("Running testAsyncConfiguration");
     final RaftProperties properties = new RaftProperties();
+    RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(properties, 
false);
     RaftClient.Builder clientBuilder = RaftClient.newBuilder()
         .setRaftGroup(RaftGroup.emptyGroup())
         .setProperties(properties);
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 837e368..f075048 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -262,8 +262,11 @@ abstract class DataStreamBaseTest extends BaseTest {
         final MultiDataStreamStateMachine stateMachine = 
getStateMachine(request.getRaftGroupId());
         final SingleDataStream stream = 
stateMachine.getSingleDataStream(request.getCallId());
         Assert.assertFalse(stream.getWritableByteChannel().isOpen());
-        return CompletableFuture.completedFuture(new RaftClientReply(request,
-            () -> bytesWritten2ByteString(stream.getByteWritten()), null));
+        return CompletableFuture.completedFuture(RaftClientReply.newBuilder()
+            .setRequest(request)
+            .setSuccess()
+            .setMessage(() -> bytesWritten2ByteString(stream.getByteWritten()))
+            .build());
       }
 
       @Override
diff --git 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index bd9e6f0..dd035f1 100644
--- 
a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ 
b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -111,12 +111,13 @@ public class TestDataStreamNetty extends 
DataStreamBaseTest {
               final RaftClientRequest r = (RaftClientRequest) 
invocation.getArguments()[0];
               final RaftClientReply reply;
               if (isLeader) {
-                reply = leaderException != null? new RaftClientReply(r, 
leaderException, null)
-                    : new RaftClientReply(r, () -> MOCK, null);
+                final RaftClientReply.Builder b = 
RaftClientReply.newBuilder().setRequest(r);
+                reply = leaderException != null? 
b.setException(leaderException).build()
+                    : b.setSuccess().setMessage(() -> MOCK).build();
               } else {
                 final RaftGroupMemberId memberId = 
RaftGroupMemberId.valueOf(peerId, groupId);
                 final NotLeaderException notLeaderException = new 
NotLeaderException(memberId, suggestedLeader, null);
-                reply = new RaftClientReply(r, notLeaderException, null);
+                reply = 
RaftClientReply.newBuilder().setRequest(r).setException(notLeaderException).build();
               }
               return CompletableFuture.completedFuture(reply);
             });

Reply via email to