This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 4a03d19 RATIS-1224. Define a CallId class. (#341)
4a03d19 is described below
commit 4a03d196b167d5e98a1719df7765aded7f562f12
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Dec 9 18:39:09 2020 +0800
RATIS-1224. Define a CallId class. (#341)
---
.../org/apache/ratis/client/impl/BlockingImpl.java | 3 +-
.../apache/ratis/client/impl/ClientProtoUtils.java | 10 ++---
.../ratis/client/impl/DataStreamClientImpl.java | 3 +-
.../ratis/client/impl/GroupManagementImpl.java | 9 +++--
.../org/apache/ratis/client/impl/OrderedAsync.java | 3 +-
.../apache/ratis/client/impl/RaftClientImpl.java | 14 +------
.../ratis/client/impl/RaftClientTestUtil.java | 3 +-
.../apache/ratis/client/impl/UnorderedAsync.java | 3 +-
.../src/main/java/org/apache/ratis/rpc/CallId.java | 46 ++++++++++++++++++++++
.../ratis/server/impl/RaftServerConstants.java | 3 --
.../apache/ratis/server/impl/ServerProtoUtils.java | 23 +++++------
.../ratis/server/leader/LogAppenderDefault.java | 4 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 7 ++--
13 files changed, 83 insertions(+), 48 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 4b85b11..7d03e62 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -34,6 +34,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +76,7 @@ class BlockingImpl implements BlockingApi {
Objects.requireNonNull(message, "message == null");
}
- final long callId = RaftClientImpl.nextCallId();
+ final long callId = CallId.getAndIncrement();
return sendRequestWithRetry(() -> client.newRaftClientRequest(server,
callId, message, type, null));
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 72dc1a2..7ef1290 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -29,6 +29,7 @@ import
org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.proto.RaftProtos.*;
@@ -47,18 +48,17 @@ import static
org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDe
public interface ClientProtoUtils {
static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
- ByteString requestorId, ByteString replyId, RaftGroupId groupId,
- long callId, boolean success) {
+ ByteString requestorId, ByteString replyId, RaftGroupId groupId, Long
callId, boolean success) {
return RaftRpcReplyProto.newBuilder()
.setRequestorId(requestorId)
.setReplyId(replyId)
.setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
- .setCallId(callId)
+ .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
.setSuccess(success);
}
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- ByteString requesterId, ByteString replyId, RaftGroupId groupId, long
callId,
+ ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long
callId,
SlidingWindowEntry slidingWindowEntry) {
if (slidingWindowEntry == null) {
slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
@@ -67,7 +67,7 @@ public interface ClientProtoUtils {
.setRequestorId(requesterId)
.setReplyId(replyId)
.setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
- .setCallId(callId)
+ .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
.setSlidingWindowEntry(slidingWindowEntry);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 86caa76..55506b9 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.protocol.*;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -180,7 +181,7 @@ public class DataStreamClientImpl implements
DataStreamClient {
final Message message =
Optional.ofNullable(headerMessage).map(ByteString::copyFrom).map(Message::valueOf).orElse(null);
RaftClientRequest request = new RaftClientRequest(clientId,
dataStreamServer.getId(), groupId,
- RaftClientImpl.nextCallId(), message,
RaftClientRequest.dataStreamRequestType(), null);
+ CallId.getAndIncrement(), message,
RaftClientRequest.dataStreamRequestType(), null);
return new DataStreamOutputImpl(request);
}
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
index e8566df..0171bcb 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/GroupManagementImpl.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.Preconditions;
import java.io.IOException;
@@ -45,7 +46,7 @@ class GroupManagementImpl implements GroupManagementApi {
public RaftClientReply add(RaftGroup newGroup) throws IOException {
Objects.requireNonNull(newGroup, "newGroup == null");
- final long callId = RaftClientImpl.nextCallId();
+ final long callId = CallId.getAndIncrement();
client.getClientRpc().addRaftPeers(newGroup.getPeers());
return
client.io().sendRequest(GroupManagementRequest.newAdd(client.getId(), server,
callId, newGroup));
}
@@ -55,14 +56,14 @@ class GroupManagementImpl implements GroupManagementApi {
throws IOException {
Objects.requireNonNull(groupId, "groupId == null");
- final long callId = RaftClientImpl.nextCallId();
+ final long callId = CallId.getAndIncrement();
return
client.io().sendRequest(GroupManagementRequest.newRemove(client.getId(), server,
callId, groupId, deleteDirectory, renameDirectory));
}
@Override
public GroupListReply list() throws IOException {
- final long callId = RaftClientImpl.nextCallId();
+ final long callId = CallId.getAndIncrement();
final RaftClientReply reply = client.io().sendRequest(
new GroupListRequest(client.getId(), server, client.getGroupId(),
callId));
Preconditions.assertTrue(reply instanceof GroupListReply, () ->
"Unexpected reply: " + reply);
@@ -74,7 +75,7 @@ class GroupManagementImpl implements GroupManagementApi {
if (groupId == null) {
groupId = client.getGroupId();
}
- final long callId = RaftClientImpl.nextCallId();
+ final long callId = CallId.getAndIncrement();
final RaftClientReply reply = client.io().sendRequest(
new GroupInfoRequest(client.getId(), server, groupId, callId));
Preconditions.assertTrue(reply instanceof GroupInfoReply, () ->
"Unexpected reply: " + reply);
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
index 57c4a8d..043181d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java
@@ -30,6 +30,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
@@ -163,7 +164,7 @@ public final class OrderedAsync {
"Interrupted when sending " + type + ", message=" + message, e));
}
- final long callId = RaftClientImpl.nextCallId();
+ final long callId = CallId.getAndIncrement();
final LongFunction<PendingOrderedRequest> constructor = seqNum -> new
PendingOrderedRequest(callId, seqNum,
slidingWindowEntry -> client.newRaftClientRequest(server, callId,
message, type, slidingWindowEntry));
return getSlidingWindow(server).submitNewRequest(constructor,
this::sendRequestWithRetry
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index d28ce40..0c70c88 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -41,6 +41,7 @@ import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
@@ -60,7 +61,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -68,12 +68,6 @@ import java.util.function.Supplier;
/** A client who sends requests to a raft service. */
public final class RaftClientImpl implements RaftClient {
- private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
-
- static long nextCallId() {
- return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
- }
-
public abstract static class PendingClientRequest {
private final long creationTimeInMs = System.currentTimeMillis();
private final CompletableFuture<RaftClientReply> replyFuture = new
CompletableFuture<>();
@@ -225,7 +219,7 @@ public final class RaftClientImpl implements RaftClient {
throws IOException {
Objects.requireNonNull(peersInNewConf, "peersInNewConf == null");
- final long callId = nextCallId();
+ final long callId = CallId.getAndIncrement();
// also refresh the rpc proxies for these peers
clientRpc.addRaftPeers(peersInNewConf);
return io().sendRequestWithRetry(() -> new SetConfigurationRequest(
@@ -344,10 +338,6 @@ public final class RaftClientImpl implements RaftClient {
}
}
- long getCallId() {
- return CALL_ID_COUNTER.get();
- }
-
@Override
public RaftClientRpc getClientRpc() {
return clientRpc;
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 6dc65e5..ba00b8f 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -23,6 +23,7 @@ import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.CallId;
/** Interface for testing raft client. */
public interface RaftClientTestUtil {
@@ -31,7 +32,7 @@ public interface RaftClientTestUtil {
}
static ClientInvocationId getClientInvocationId(RaftClient client) {
- return ClientInvocationId.valueOf(client.getId(),
((RaftClientImpl)client).getCallId());
+ return ClientInvocationId.valueOf(client.getId(), CallId.get());
}
static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId
server,
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index 7b7f158..43ed7a9 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public interface UnorderedAsync {
}
static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type,
RaftClientImpl client) {
- final long callId = RaftClientImpl.nextCallId();
+ final long callId = CallId.getAndIncrement();
final PendingClientRequest pending = new PendingUnorderedRequest(
() -> client.newRaftClientRequest(null, callId, null, type, null));
sendRequestWithRetry(pending, client);
diff --git a/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
new file mode 100644
index 0000000..a6914e2
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/rpc/CallId.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.rpc;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * A long ID for RPC calls.
+ *
+ * This class is threadsafe.
+ */
+public final class CallId {
+ private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+ /** @return the default value. */
+ public static long getDefault() {
+ return 0;
+ }
+
+ /** @return the current value. */
+ public static long get() {
+ return CALL_ID_COUNTER.get() & Long.MAX_VALUE;
+ }
+
+ /** @return the current value and then increment. */
+ public static long getAndIncrement() {
+ return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ private CallId() {}
+}
\ No newline at end of file
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index 59023a6..bf1e9db 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -18,9 +18,6 @@
package org.apache.ratis.server.impl;
public final class RaftServerConstants {
- public static final long DEFAULT_CALLID = 0;
- public static final long DEFAULT_TERM = 0;
-
private RaftServerConstants() {
//Never constructed
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 86b6873..10049d2 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -38,9 +38,6 @@ import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_TERM;
-
/** Server proto utilities for internal use. */
public interface ServerProtoUtils {
static TermIndex toTermIndex(TermIndexProto p) {
@@ -195,10 +192,10 @@ public interface ServerProtoUtils {
return b.build();
}
- static LogEntryProto toLogEntryProto(RaftConfiguration conf, long term, long
index) {
- return LogEntryProto.newBuilder()
- .setTerm(term)
- .setIndex(index)
+ static LogEntryProto toLogEntryProto(RaftConfiguration conf, Long term, long
index) {
+ final LogEntryProto.Builder b = LogEntryProto.newBuilder();
+ Optional.ofNullable(term).ifPresent(b::setTerm);
+ return b.setIndex(index)
.setConfigurationEntry(toRaftConfigurationProto(conf))
.build();
}
@@ -321,7 +318,7 @@ public interface ServerProtoUtils {
static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success) {
return ClientProtoUtils.toRaftRpcReplyProtoBuilder(
- requestorId.toByteString(), replyId.getPeerId().toByteString(),
replyId.getGroupId(), DEFAULT_CALLID, success);
+ requestorId.toByteString(), replyId.getPeerId().toByteString(),
replyId.getGroupId(), null, success);
}
static RequestVoteReplyProto toRequestVoteReplyProto(
@@ -336,7 +333,7 @@ public interface ServerProtoUtils {
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
RaftGroupMemberId requestorId, RaftPeerId replyId) {
return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
- requestorId.getPeerId().toByteString(), replyId.toByteString(),
requestorId.getGroupId(), DEFAULT_CALLID, null);
+ requestorId.getPeerId().toByteString(), replyId.toByteString(),
requestorId.getGroupId(), null, null);
}
static RequestVoteRequestProto toRequestVoteRequestProto(
@@ -397,8 +394,8 @@ public interface ServerProtoUtils {
.addAllFileChunks(chunks)
.setTotalSize(totalSize)
.setDone(done);
- // Set term to DEFAULT_TERM as this term is not going to used by
installSnapshot to update the RaftConfiguration
- final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration,
DEFAULT_TERM,
+ // term is not going to used by installSnapshot to update the
RaftConfiguration
+ final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration,
null,
((RaftConfigurationImpl)raftConfiguration).getLogEntryIndex());
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
@@ -414,8 +411,8 @@ public interface ServerProtoUtils {
final InstallSnapshotRequestProto.NotificationProto.Builder
notificationProto =
InstallSnapshotRequestProto.NotificationProto.newBuilder()
.setFirstAvailableTermIndex(toTermIndexProto(firstAvailable));
- // Set term to DEFAULT_TERM as this term is not going to used by
installSnapshot to update the RaftConfiguration
- final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration,
DEFAULT_TERM,
+ // term is not going to used by installSnapshot to update the
RaftConfiguration
+ final LogEntryProto confLogEntryProto = toLogEntryProto(raftConfiguration,
null,
((RaftConfigurationImpl)raftConfiguration).getLogEntryIndex());
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index 5eb85d8..9a46fe6 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -22,6 +22,7 @@ import
org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -31,7 +32,6 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.UUID;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static
org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
/**
@@ -51,7 +51,7 @@ class LogAppenderDefault extends LogAppenderBase {
while (isRunning()) { // keep retrying for IOException
try {
if (request == null || request.getEntriesCount() == 0) {
- request = newAppendEntriesRequest(DEFAULT_CALLID, false);
+ request = newAppendEntriesRequest(CallId.getAndIncrement(), false);
}
if (request == null) {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 2167f7e..acf0735 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.CallId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.ServerFactory;
@@ -77,8 +78,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-
public abstract class MiniRaftCluster implements Closeable {
public static final Logger LOG =
LoggerFactory.getLogger(MiniRaftCluster.class);
@@ -714,7 +713,7 @@ public abstract class MiniRaftCluster implements Closeable {
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, Message message) {
- return newRaftClientRequest(clientId, leaderId, DEFAULT_CALLID, message);
+ return newRaftClientRequest(clientId, leaderId, CallId.getDefault(),
message);
}
public RaftClientRequest newRaftClientRequest(
@@ -726,7 +725,7 @@ public abstract class MiniRaftCluster implements Closeable {
public SetConfigurationRequest newSetConfigurationRequest(
ClientId clientId, RaftPeerId leaderId,
RaftPeer... peers) {
- return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
DEFAULT_CALLID, Arrays.asList(peers));
+ return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
CallId.getDefault(), Arrays.asList(peers));
}
public void setConfiguration(RaftPeer... peers) throws IOException {