This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a03d19  RATIS-1224. Define a CallId class. (#341)
4a03d19 is described below

commit 4a03d196b167d5e98a1719df7765aded7f562f12
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Dec 9 18:39:09 2020 +0800

    RATIS-1224. Define a CallId class. (#341)
---
 .../org/apache/ratis/client/impl/BlockingImpl.java |  3 +-
 .../apache/ratis/client/impl/ClientProtoUtils.java | 10 ++---
 .../ratis/client/impl/DataStreamClientImpl.java    |  3 +-
 .../ratis/client/impl/GroupManagementImpl.java     |  9 +++--
 .../org/apache/ratis/client/impl/OrderedAsync.java |  3 +-
 .../apache/ratis/client/impl/RaftClientImpl.java   | 14 +------
 .../ratis/client/impl/RaftClientTestUtil.java      |  3 +-
 .../apache/ratis/client/impl/UnorderedAsync.java   |  3 +-
 .../src/main/java/org/apache/ratis/rpc/CallId.java | 46 ++++++++++++++++++++++
 .../ratis/server/impl/RaftServerConstants.java     |  3 --
 .../apache/ratis/server/impl/ServerProtoUtils.java | 23 +++++------
 .../ratis/server/leader/LogAppenderDefault.java    |  4 +-
 .../apache/ratis/server/impl/MiniRaftCluster.java  |  7 ++--
 13 files changed, 83 insertions(+), 48 deletions(-)

diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 4b85b11..7d03e62 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -34,6 +34,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ class BlockingImpl implements BlockingApi {
       Objects.requireNonNull(message, "message == null");
     }
 
-    final long callId = RaftClientImpl.nextCallId();
+    final long callId = CallId.getAndIncrement();
     return sendRequestWithRetry(() -> client.newRaftClientRequest(server, 
callId, message, type, null));
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 72dc1a2..7ef1290 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -29,6 +29,7 @@ import 
org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.proto.RaftProtos.*;
@@ -47,18 +48,17 @@ import static 
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDe
 public interface ClientProtoUtils {
 
   static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
-      ByteString requestorId, ByteString replyId, RaftGroupId groupId,
-      long callId, boolean success) {
+      ByteString requestorId, ByteString replyId, RaftGroupId groupId, Long 
callId, boolean success) {
     return RaftRpcReplyProto.newBuilder()
         .setRequestorId(requestorId)
         .setReplyId(replyId)
         .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
-        .setCallId(callId)
+        .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
         .setSuccess(success);
   }
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ByteString requesterId, ByteString replyId, RaftGroupId groupId, long 
callId,
+      ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long 
callId,
       SlidingWindowEntry slidingWindowEntry) {
     if (slidingWindowEntry == null) {
       slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
@@ -67,7 +67,7 @@ public interface ClientProtoUtils {
         .setRequestorId(requesterId)
         .setReplyId(replyId)
         .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
-        .setCallId(callId)
+        .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
         .setSlidingWindowEntry(slidingWindowEntry);
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 86caa76..55506b9 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -180,7 +181,7 @@ public class DataStreamClientImpl implements 
DataStreamClient {
     final Message message =
         
Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
     RaftClientRequest request = new RaftClientRequest(clientId, 
dataStreamServer.getId(), groupId,
-        RaftClientImpl.nextCallId(), message, 
RaftClientRequest.dataStreamRequestType(), null);
+        CallId.getAndIncrement(), message, 
RaftClientRequest.dataStreamRequestType(), null);
     return new DataStreamOutputImpl(request);
   }
 
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index e8566df..0171bcb 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
@@ -45,7 +46,7 @@ class GroupManagementImpl implements GroupManagementApi {
   public RaftClientReply add(RaftGroup newGroup) throws IOException {
     Objects.requireNonNull(newGroup, "newGroup == null");
 
-    final long callId = RaftClientImpl.nextCallId();
+    final long callId = CallId.getAndIncrement();
     client.getClientRpc().addRaftPeers(newGroup.getPeers());
     return 
client.io().sendRequest(GroupManagementRequest.newAdd(client.getId(), server, 
callId, newGroup));
   }
@@ -55,14 +56,14 @@ class GroupManagementImpl implements GroupManagementApi {
       throws IOException {
     Objects.requireNonNull(groupId, "groupId == null");
 
-    final long callId = RaftClientImpl.nextCallId();
+    final long callId = CallId.getAndIncrement();
     return 
client.io().sendRequest(GroupManagementRequest.newRemove(client.getId(), server,
         callId, groupId, deleteDirectory, renameDirectory));
   }
 
   @Override
   public GroupListReply list() throws IOException {
-    final long callId = RaftClientImpl.nextCallId();
+    final long callId = CallId.getAndIncrement();
     final RaftClientReply reply = client.io().sendRequest(
         new GroupListRequest(client.getId(), server, client.getGroupId(), 
callId));
     Preconditions.assertTrue(reply instanceof GroupListReply, () -> 
"Unexpected reply: " + reply);
@@ -74,7 +75,7 @@ class GroupManagementImpl implements GroupManagementApi {
     if (groupId == null) {
       groupId = client.getGroupId();
     }
-    final long callId = RaftClientImpl.nextCallId();
+    final long callId = CallId.getAndIncrement();
     final RaftClientReply reply = client.io().sendRequest(
         new GroupInfoRequest(client.getId(), server, groupId, callId));
     Preconditions.assertTrue(reply instanceof GroupInfoReply, () -> 
"Unexpected reply: " + reply);
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 57c4a8d..043181d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -30,6 +30,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
@@ -163,7 +164,7 @@ public final class OrderedAsync {
           "Interrupted when sending " + type + ", message=" + message, e));
     }
 
-    final long callId = RaftClientImpl.nextCallId();
+    final long callId = CallId.getAndIncrement();
     final LongFunction<PendingOrderedRequest> constructor = seqNum -> new 
PendingOrderedRequest(callId, seqNum,
         slidingWindowEntry -> client.newRaftClientRequest(server, callId, 
message, type, slidingWindowEntry));
     return getSlidingWindow(server).submitNewRequest(constructor, 
this::sendRequestWithRetry
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index d28ce40..0c70c88 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -41,6 +41,7 @@ import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
 import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
@@ -60,7 +61,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -68,12 +68,6 @@ import java.util.function.Supplier;
 
 /** A client who sends requests to a raft service. */
 public final class RaftClientImpl implements RaftClient {
-  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
-  static long nextCallId() {
-    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
-  }
-
   public abstract static class PendingClientRequest {
     private final long creationTimeInMs = System.currentTimeMillis();
     private final CompletableFuture<RaftClientReply> replyFuture = new 
CompletableFuture<>();
@@ -225,7 +219,7 @@ public final class RaftClientImpl implements RaftClient {
       throws IOException {
     Objects.requireNonNull(peersInNewConf, "peersInNewConf == null");
 
-    final long callId = nextCallId();
+    final long callId = CallId.getAndIncrement();
     // also refresh the rpc proxies for these peers
     clientRpc.addRaftPeers(peersInNewConf);
     return io().sendRequestWithRetry(() -> new SetConfigurationRequest(
@@ -344,10 +338,6 @@ public final class RaftClientImpl implements RaftClient {
     }
   }
 
-  long getCallId() {
-    return CALL_ID_COUNTER.get();
-  }
-
   @Override
   public RaftClientRpc getClientRpc() {
     return clientRpc;
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 6dc65e5..ba00b8f 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.CallId;
 
 /** Interface for testing raft client. */
 public interface RaftClientTestUtil {
@@ -31,7 +32,7 @@ public interface RaftClientTestUtil {
   }
 
   static ClientInvocationId getClientInvocationId(RaftClient client) {
-    return ClientInvocationId.valueOf(client.getId(), 
((RaftClientImpl)client).getCallId());
+    return ClientInvocationId.valueOf(client.getId(), CallId.get());
   }
 
   static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId 
server,
diff --git 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 7b7f158..43ed7a9 100644
--- 
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ 
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public interface UnorderedAsync {
   }
 
   static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, 
RaftClientImpl client) {
-    final long callId = RaftClientImpl.nextCallId();
+    final long callId = CallId.getAndIncrement();
     final PendingClientRequest pending = new PendingUnorderedRequest(
         () -> client.newRaftClientRequest(null, callId, null, type, null));
     sendRequestWithRetry(pending, client);
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java 
b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
new file mode 100644
index 0000000..a6914e2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.rpc;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A long ID for RPC calls.
+ *
+ * This class is threadsafe.
+ */
+public final class CallId {
+  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+  /** @return the default value. */
+  public static long getDefault() {
+    return 0;
+  }
+
+  /** @return the current value. */
+  public static long get() {
+    return CALL_ID_COUNTER.get() & Long.MAX_VALUE;
+  }
+
+  /** @return the current value and then increment. */
+  public static long getAndIncrement() {
+    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+  }
+
+  private CallId() {}
+}
\ No newline at end of file
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index 59023a6..bf1e9db 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -18,9 +18,6 @@
 package org.apache.ratis.server.impl;
 
 public final class RaftServerConstants {
-  public static final long DEFAULT_CALLID = 0;
-  public static final long DEFAULT_TERM = 0;
-
   private RaftServerConstants() {
     //Never constructed
   }
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 86b6873..10049d2 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -38,9 +38,6 @@ import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_TERM;
-
 /** Server proto utilities for internal use. */
 public interface ServerProtoUtils {
   static TermIndex toTermIndex(TermIndexProto p) {
@@ -195,10 +192,10 @@ public interface ServerProtoUtils {
     return b.build();
   }
 
-  static LogEntryProto toLogEntryProto(RaftConfiguration conf, long term, long 
index) {
-    return LogEntryProto.newBuilder()
-        .setTerm(term)
-        .setIndex(index)
+  static LogEntryProto toLogEntryProto(RaftConfiguration conf, Long term, long 
index) {
+    final LogEntryProto.Builder b = LogEntryProto.newBuilder();
+    Optional.ofNullable(term).ifPresent(b::setTerm);
+    return b.setIndex(index)
         .setConfigurationEntry(toRaftConfigurationProto(conf))
         .build();
   }
@@ -321,7 +318,7 @@ public interface ServerProtoUtils {
   static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
       RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success) {
     return ClientProtoUtils.toRaftRpcReplyProtoBuilder(
-        requestorId.toByteString(), replyId.getPeerId().toByteString(), 
replyId.getGroupId(), DEFAULT_CALLID, success);
+        requestorId.toByteString(), replyId.getPeerId().toByteString(), 
replyId.getGroupId(), null, success);
   }
 
   static RequestVoteReplyProto toRequestVoteReplyProto(
@@ -336,7 +333,7 @@ public interface ServerProtoUtils {
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
       RaftGroupMemberId requestorId, RaftPeerId replyId) {
     return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
-        requestorId.getPeerId().toByteString(), replyId.toByteString(), 
requestorId.getGroupId(), DEFAULT_CALLID, null);
+        requestorId.getPeerId().toByteString(), replyId.toByteString(), 
requestorId.getGroupId(), null, null);
   }
 
   static RequestVoteRequestProto toRequestVoteRequestProto(
@@ -397,8 +394,8 @@ public interface ServerProtoUtils {
             .addAllFileChunks(chunks)
             .setTotalSize(totalSize)
             .setDone(done);
-    // Set term to DEFAULT_TERM as this term is not going to used by 
installSnapshot to update the RaftConfiguration
-    final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration, 
DEFAULT_TERM,
+    // term is not going to used by installSnapshot to update the 
RaftConfiguration
+    final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration, 
null,
         ((RaftConfigurationImpl)raftConfiguration).getLogEntryIndex());
     return InstallSnapshotRequestProto.newBuilder()
         .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
@@ -414,8 +411,8 @@ public interface ServerProtoUtils {
     final InstallSnapshotRequestProto.NotificationProto.Builder 
notificationProto =
         InstallSnapshotRequestProto.NotificationProto.newBuilder()
             .setFirstAvailableTermIndex(toTermIndexProto(firstAvailable));
-    // Set term to DEFAULT_TERM as this term is not going to used by 
installSnapshot to update the RaftConfiguration
-    final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration, 
DEFAULT_TERM,
+    // term is not going to used by installSnapshot to update the 
RaftConfiguration
+    final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration, 
null,
         ((RaftConfigurationImpl)raftConfiguration).getLogEntryIndex());
     return InstallSnapshotRequestProto.newBuilder()
         .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 5eb85d8..9a46fe6 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -22,6 +22,7 @@ import 
org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -31,7 +32,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.UUID;
 
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static 
org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
 
 /**
@@ -51,7 +51,7 @@ class LogAppenderDefault extends LogAppenderBase {
     while (isRunning()) { // keep retrying for IOException
       try {
         if (request == null || request.getEntriesCount() == 0) {
-          request = newAppendEntriesRequest(DEFAULT_CALLID, false);
+          request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
         }
 
         if (request == null) {
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 2167f7e..acf0735 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.retry.RetryPolicies;
 import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.ServerFactory;
@@ -77,8 +78,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-
 public abstract class MiniRaftCluster implements Closeable {
   public static final Logger LOG = 
LoggerFactory.getLogger(MiniRaftCluster.class);
 
@@ -714,7 +713,7 @@ public abstract class MiniRaftCluster implements Closeable {
 
   public RaftClientRequest newRaftClientRequest(
       ClientId clientId, RaftPeerId leaderId, Message message) {
-    return newRaftClientRequest(clientId, leaderId, DEFAULT_CALLID, message);
+    return newRaftClientRequest(clientId, leaderId, CallId.getDefault(), 
message);
   }
 
   public RaftClientRequest newRaftClientRequest(
@@ -726,7 +725,7 @@ public abstract class MiniRaftCluster implements Closeable {
   public SetConfigurationRequest newSetConfigurationRequest(
       ClientId clientId, RaftPeerId leaderId,
       RaftPeer... peers) {
-    return new SetConfigurationRequest(clientId, leaderId, getGroupId(), 
DEFAULT_CALLID, Arrays.asList(peers));
+    return new SetConfigurationRequest(clientId, leaderId, getGroupId(), 
CallId.getDefault(), Arrays.asList(peers));
   }
 
   public void setConfiguration(RaftPeer... peers) throws IOException {

Reply via email to