This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 16b6536e2 RATIS-2025. Move out assert and proto methods from
RaftServerImpl. (#1041)
16b6536e2 is described below
commit 16b6536e220ccbc8f9228b4fccfab8eb823ee83e
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Feb 7 09:20:06 2024 -0800
RATIS-2025. Move out assert and proto methods from RaftServerImpl. (#1041)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 233 ++++++++-------------
.../org/apache/ratis/server/impl/RoleInfo.java | 57 +++++
.../apache/ratis/server/impl/ServerImplUtils.java | 55 ++++-
.../apache/ratis/server/impl/ServerProtoUtils.java | 4 +
.../server/impl/SnapshotInstallationHandler.java | 2 +-
5 files changed, 205 insertions(+), 146 deletions(-)
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 7390093c3..133cfebdc 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -17,36 +17,30 @@
*/
package org.apache.ratis.server.impl;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.NoSuchFileException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Function;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.metrics.Timekeeper;
-import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
+import org.apache.ratis.proto.RaftProtos.RaftConfigurationProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexReplyProto;
+import org.apache.ratis.proto.RaftProtos.ReadIndexRequestProto;
+import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
+import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
@@ -64,7 +58,20 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.TransferLeadershipRequest;
-import org.apache.ratis.protocol.exceptions.*;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.ReadIndexException;
+import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
+import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
+import org.apache.ratis.protocol.exceptions.SetConfigurationException;
+import org.apache.ratis.protocol.exceptions.StaleReadException;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.DivisionProperties;
@@ -74,7 +81,6 @@ import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.LeaderElection.Phase;
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
import org.apache.ratis.server.leader.LeaderState;
-import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.metrics.RaftServerMetricsImpl;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
@@ -85,7 +91,6 @@ import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
-import org.apache.ratis.server.util.ServerStringUtils;
import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
@@ -105,13 +110,43 @@ import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedSupplier;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.ratis.server.impl.ServerImplUtils.assertEntries;
+import static org.apache.ratis.server.impl.ServerImplUtils.assertGroup;
import static
org.apache.ratis.server.impl.ServerImplUtils.effectiveCommitIndex;
import static
org.apache.ratis.server.impl.ServerProtoUtils.toAppendEntriesReplyProto;
+import static
org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexReplyProto;
+import static
org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexRequestProto;
+import static
org.apache.ratis.server.impl.ServerProtoUtils.toRequestVoteReplyProto;
+import static
org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto;
import static
org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
import static
org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
+import static
org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString;
class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
@@ -611,46 +646,7 @@ class RaftServerImpl implements RaftServer.Division,
}
RoleInfoProto getRoleInfoProto() {
- RaftPeerRole currentRole = role.getCurrentRole();
- RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
- .setSelf(getPeer().getRaftPeerProto())
- .setRole(currentRole)
- .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs());
- switch (currentRole) {
- case CANDIDATE:
- CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder()
- .setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs());
- roleInfo.setCandidateInfo(candidate);
- break;
-
- case LISTENER:
- case FOLLOWER:
- final Optional<FollowerState> fs = role.getFollowerState();
- final ServerRpcProto leaderInfo = ServerProtoUtils.toServerRpcProto(
- getRaftConf().getPeer(state.getLeaderId()),
-
fs.map(FollowerState::getLastRpcTime).map(Timestamp::elapsedTimeMs).orElse(0L));
- // FollowerState can be null while adding a new peer as it is not
- // a voting member yet
- roleInfo.setFollowerInfo(FollowerInfoProto.newBuilder()
- .setLeaderInfo(leaderInfo)
- .setOutstandingOp(fs.map(FollowerState::getOutstandingOp).orElse(0)));
- break;
-
- case LEADER:
- role.getLeaderState().ifPresent(ls -> {
- final LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
- ls.getLogAppenders().map(LogAppender::getFollower).forEach(f ->
- leader.addFollowerInfo(ServerProtoUtils.toServerRpcProto(
- f.getPeer(), f.getLastRpcResponseTime().elapsedTimeMs())));
- leader.setTerm(ls.getCurrentTerm());
- roleInfo.setLeaderInfo(leader);
- });
- break;
-
- default:
- throw new IllegalStateException("incorrect role of server " +
currentRole);
- }
- return roleInfo.build();
+ return role.buildRoleInfoProto(this);
}
synchronized void changeToCandidate(boolean forceStartLeaderElection) {
@@ -711,7 +707,7 @@ class RaftServerImpl implements RaftServer.Division,
*/
private CompletableFuture<RaftClientReply>
checkLeaderState(RaftClientRequest request, CacheEntry entry) {
try {
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
} catch (GroupMismatchException e) {
return RetryCacheImpl.failWithException(e, entry);
}
@@ -760,15 +756,6 @@ class RaftServerImpl implements RaftServer.Division,
getMemberId() + " is not in " + expected + ": current state is " + c),
expected);
}
- void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws
GroupMismatchException {
- final RaftGroupId groupId = getMemberId().getGroupId();
- if (!groupId.equals(requestorGroupId)) {
- throw new GroupMismatchException(getMemberId()
- + ": The group (" + requestorGroupId + ") of " + requestorId
- + " does not match the group (" + groupId + ") of the server " +
getId());
- }
- }
-
/**
* Append a transaction to the log for processing a client request.
* Note that the given request could be different from {@link
TransactionContext#getClientRequest()}
@@ -1002,8 +989,7 @@ class RaftServerImpl implements RaftServer.Division,
if (leaderId == null) {
return JavaUtils.completeExceptionally(new
ReadIndexException(getMemberId() + ": Leader is unknown."));
}
- final ReadIndexRequestProto request =
- ServerProtoUtils.toReadIndexRequestProto(clientRequest, getMemberId(),
leaderId);
+ final ReadIndexRequestProto request =
toReadIndexRequestProto(clientRequest, getMemberId(), leaderId);
try {
return getServerRpc().async().readIndexAsync(request);
} catch (IOException e) {
@@ -1180,7 +1166,7 @@ class RaftServerImpl implements RaftServer.Division,
LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
synchronized (this) {
CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
@@ -1221,7 +1207,7 @@ class RaftServerImpl implements RaftServer.Division,
CompletableFuture<RaftClientReply>
takeSnapshotAsync(SnapshotManagementRequest request) throws IOException {
LOG.info("{}: takeSnapshotAsync {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
//TODO(liuyaolong): get the gap value from shell command
long minGapValue =
RaftServerConfigKeys.Snapshot.creationGap(proxy.getProperties());
@@ -1253,7 +1239,7 @@ class RaftServerImpl implements RaftServer.Division,
throws IOException {
LOG.info("{} receive leaderElectionManagement request {}", getMemberId(),
request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
final LeaderElectionManagementRequest.Pause pause = request.getPause();
if (pause != null) {
@@ -1272,7 +1258,7 @@ class RaftServerImpl implements RaftServer.Division,
CompletableFuture<RaftClientReply>
stepDownLeaderAsync(TransferLeadershipRequest request) throws IOException {
LOG.info("{} receive stepDown leader request {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
return role.getLeaderState().map(leader ->
leader.submitStepDownRequestAsync(request))
.orElseGet(() -> CompletableFuture.completedFuture(
@@ -1289,7 +1275,7 @@ class RaftServerImpl implements RaftServer.Division,
public CompletableFuture<RaftClientReply>
setConfigurationAsync(SetConfigurationRequest request) throws IOException {
LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(request.getRequestorId(), request.getRaftGroupId());
+ assertGroup(getMemberId(), request);
CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
if (reply != null) {
@@ -1368,15 +1354,13 @@ class RaftServerImpl implements RaftServer.Division,
}
/**
- * check if the remote peer is not included in the current conf
- * and should shutdown. should shutdown if all the following stands:
- * 1. this is a leader
+ * The remote peer should shut down if all the following are true.
+ * 1. this is the current leader
* 2. current conf is stable and has been committed
- * 3. candidate id is not included in conf
- * 4. candidate's last entry's index < conf's index
+ * 3. candidate is not in the current conf
+ * 4. candidate last entry index < conf index (the candidate was removed)
*/
- private boolean shouldSendShutdown(RaftPeerId candidateId,
- TermIndex candidateLastEntry) {
+ private boolean shouldSendShutdown(RaftPeerId candidateId, TermIndex
candidateLastEntry) {
return getInfo().isLeader()
&& getRaftConf().isStable()
&& getState().isConfCommitted()
@@ -1403,7 +1387,7 @@ class RaftServerImpl implements RaftServer.Division,
LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
getMemberId(), phase, candidateId, candidateGroupId, candidateTerm,
candidateLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(candidateId, candidateGroupId);
+ assertGroup(getMemberId(), candidateId, candidateGroupId);
boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
@@ -1430,49 +1414,16 @@ class RaftServerImpl implements RaftServer.Division,
} else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
shouldShutdown = true;
}
- reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId,
getMemberId(),
+ reply = toRequestVoteReplyProto(candidateId, getMemberId(),
voteGranted, state.getCurrentTerm(), shouldShutdown);
if (LOG.isInfoEnabled()) {
LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
- getMemberId(), phase,
ServerStringUtils.toRequestVoteReplyString(reply), state);
+ getMemberId(), phase, toRequestVoteReplyString(reply), state);
}
}
return reply;
}
- private void validateEntries(long expectedTerm, TermIndex previous,
- List<LogEntryProto> entries) {
- if (entries != null && !entries.isEmpty()) {
- final long index0 = entries.get(0).getIndex();
- // Check if next entry's index is 1 greater than the snapshotIndex. If
yes, then
- // we do not have to check for the existence of previous.
- if (index0 != state.getSnapshotIndex() + 1) {
- if (previous == null || previous.getTerm() == 0) {
- Preconditions.assertTrue(index0 == 0,
- "Unexpected Index: previous is null but
entries[%s].getIndex()=%s",
- 0, index0);
- } else {
- Preconditions.assertTrue(previous.getIndex() == index0 - 1,
- "Unexpected Index: previous is %s but entries[%s].getIndex()=%s",
- previous, 0, index0);
- }
- }
-
- for (int i = 0; i < entries.size(); i++) {
- LogEntryProto entry = entries.get(i);
- final long t = entry.getTerm();
- Preconditions.assertTrue(expectedTerm >= t,
- "Unexpected Term: entries[%s].getTerm()=%s but expectedTerm=%s",
- i, t, expectedTerm);
-
- final long indexi = entry.getIndex();
- Preconditions.assertTrue(indexi == index0 + i,
- "Unexpected Index: entries[%s].getIndex()=%s but
entries[0].getIndex()=%s",
- i, indexi, index0);
- }
- }
- }
-
@Override
public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto r)
throws IOException {
@@ -1499,8 +1450,8 @@ class RaftServerImpl implements RaftServer.Division,
if (!startComplete.get()) {
throw new ServerNotReadyException(getMemberId() + ": The server role
is not yet initialized.");
}
- assertGroup(leaderId, leaderGroupId);
- validateEntries(r.getLeaderTerm(), previous, r.getEntriesList());
+ assertGroup(getMemberId(), leaderId, leaderGroupId);
+ assertEntries(r, previous, state);
return appendEntriesAsync(leaderId, request.getCallId(), previous,
requestRef);
} catch(Exception t) {
@@ -1519,14 +1470,12 @@ class RaftServerImpl implements RaftServer.Division,
final LeaderStateImpl leader = role.getLeaderState().orElse(null);
if (leader == null) {
- return CompletableFuture.completedFuture(
- ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false,
RaftLog.INVALID_LOG_INDEX));
+ return CompletableFuture.completedFuture(toReadIndexReplyProto(peerId,
getMemberId()));
}
return
getReadIndex(ClientProtoUtils.toRaftClientRequest(request.getClientRequest()),
leader)
- .thenApply(index -> ServerProtoUtils.toReadIndexReplyProto(peerId,
getMemberId(), true, index))
- .exceptionally(throwable ->
- ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(),
false, RaftLog.INVALID_LOG_INDEX));
+ .thenApply(index -> toReadIndexReplyProto(peerId, getMemberId(), true,
index))
+ .exceptionally(throwable -> toReadIndexReplyProto(peerId,
getMemberId()));
}
static void logAppendEntries(boolean isHeartbeat, Supplier<String> message) {
@@ -1731,37 +1680,37 @@ class RaftServerImpl implements RaftServer.Division,
if (!request.hasLeaderLastEntry()) {
// It should have a leaderLastEntry since there is a placeHolder entry.
LOG.warn("{}: leaderLastEntry is missing in {}", getMemberId(), request);
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId,
getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
final TermIndex leaderLastEntry =
TermIndex.valueOf(request.getLeaderLastEntry());
LOG.debug("{}: receive startLeaderElection from {} with lastEntry {}",
getMemberId(), leaderId, leaderLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(leaderId, leaderGroupId);
+ assertGroup(getMemberId(), leaderId, leaderGroupId);
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
final boolean recognized = state.recognizeLeader("startLeaderElection",
leaderId, leaderLastEntry.getTerm());
if (!recognized) {
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId,
getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
if (!getInfo().isFollower()) {
LOG.warn("{} refused StartLeaderElectionRequest from {}, because role
is:{}",
getMemberId(), leaderId, role.getCurrentRole());
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId,
getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
if (ServerState.compareLog(state.getLastEntry(), leaderLastEntry) < 0) {
LOG.warn("{} refused StartLeaderElectionRequest from {}, because
lastEntry:{} less than leaderEntry:{}",
getMemberId(), leaderId, leaderLastEntry, state.getLastEntry());
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId,
getMemberId(), false);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
changeToCandidate(true);
- return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId,
getMemberId(), true);
+ return toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index fe2bc963b..5eb01a9d6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -18,8 +18,14 @@
package org.apache.ratis.server.impl;
+import org.apache.ratis.proto.RaftProtos.CandidateInfoProto;
+import org.apache.ratis.proto.RaftProtos.FollowerInfoProto;
+import org.apache.ratis.proto.RaftProtos.LeaderInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.Timestamp;
@@ -32,6 +38,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.ratis.server.impl.ServerProtoUtils.toServerRpcProto;
+
/**
* Maintain the Role of a Raft Peer.
*/
@@ -141,6 +149,55 @@ class RoleInfo {
return updated;
}
+ RoleInfoProto buildRoleInfoProto(RaftServerImpl server) {
+ final RaftPeerRole currentRole = getCurrentRole();
+ final RoleInfoProto.Builder proto = RoleInfoProto.newBuilder()
+ .setSelf(server.getPeer().getRaftPeerProto())
+ .setRole(currentRole)
+ .setRoleElapsedTimeMs(getRoleElapsedTimeMs());
+
+ switch (currentRole) {
+ case LEADER:
+ getLeaderState().ifPresent(leader -> {
+ final LeaderInfoProto.Builder b = LeaderInfoProto.newBuilder()
+ .setTerm(leader.getCurrentTerm());
+ leader.getLogAppenders()
+ .map(LogAppender::getFollower)
+ .map(f -> toServerRpcProto(f.getPeer(),
f.getLastRpcResponseTime().elapsedTimeMs()))
+ .forEach(b::addFollowerInfo);
+ proto.setLeaderInfo(b);
+ });
+ return proto.build();
+
+ case CANDIDATE:
+ return proto.setCandidateInfo(CandidateInfoProto.newBuilder()
+
.setLastLeaderElapsedTimeMs(server.getState().getLastLeaderElapsedTimeMs()))
+ .build();
+
+ case LISTENER:
+ case FOLLOWER:
+ // FollowerState can be null while adding a new peer as it is not a
voting member yet
+ final FollowerState follower = getFollowerState().orElse(null);
+ final long rpcElapsed;
+ final int outstandingOp;
+ if (follower != null) {
+ rpcElapsed = follower.getLastRpcTime().elapsedTimeMs();
+ outstandingOp = follower.getOutstandingOp();
+ } else {
+ rpcElapsed = 0;
+ outstandingOp = 0;
+ }
+ final RaftPeer leader =
server.getRaftConf().getPeer(server.getState().getLeaderId());
+ return proto.setFollowerInfo(FollowerInfoProto.newBuilder()
+ .setLeaderInfo(toServerRpcProto(leader, rpcElapsed))
+ .setOutstandingOp(outstandingOp))
+ .build();
+
+ default:
+ throw new IllegalStateException("Unexpected role " + currentRole);
+ }
+ }
+
@Override
public String toString() {
return String.format("%9s", role);
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index e4fe8f232..e26c6e0ab 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,9 +19,15 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
@@ -35,7 +41,6 @@ import org.apache.ratis.util.TimeDuration;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.TimeUnit;
/** Server utilities for internal use. */
@@ -88,7 +93,51 @@ public final class ServerImplUtils {
}
static long effectiveCommitIndex(long leaderCommitIndex, TermIndex
followerPrevious, int numAppendEntries) {
- final long p =
Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
- return Math.min(leaderCommitIndex, p + numAppendEntries);
+ final long previous = followerPrevious != null?
followerPrevious.getIndex() : RaftLog.LEAST_VALID_LOG_INDEX;
+ return Math.min(leaderCommitIndex, previous + numAppendEntries);
+ }
+
+ static void assertGroup(RaftGroupMemberId serverMemberId, RaftClientRequest
request) throws GroupMismatchException {
+ assertGroup(serverMemberId, request.getRequestorId(),
request.getRaftGroupId());
+ }
+
+ static void assertGroup(RaftGroupMemberId localMemberId, Object remoteId,
RaftGroupId remoteGroupId)
+ throws GroupMismatchException {
+ final RaftGroupId localGroupId = localMemberId.getGroupId();
+ if (!localGroupId.equals(remoteGroupId)) {
+ throw new GroupMismatchException(localMemberId
+ + ": The group (" + remoteGroupId + ") of remote " + remoteId
+ + " does not match the group (" + localGroupId + ") of local " +
localMemberId.getPeerId());
+ }
+ }
+
+ static void assertEntries(AppendEntriesRequestProto proto, TermIndex
previous, ServerState state) {
+ final List<LogEntryProto> entries = proto.getEntriesList();
+ if (entries != null && !entries.isEmpty()) {
+ final long index0 = entries.get(0).getIndex();
+ // Check if next entry's index is 1 greater than the snapshotIndex. If
yes, then
+ // we do not have to check for the existence of previous.
+ if (index0 != state.getSnapshotIndex() + 1) {
+ final long expected = previous == null || previous.getTerm() == 0 ? 0
: previous.getIndex() + 1;
+ Preconditions.assertTrue(index0 == expected,
+ "Unexpected Index: previous is %s but entries[%s].getIndex() == %s
!= %s",
+ previous, 0, index0, expected);
+ }
+
+ final long leaderTerm = proto.getLeaderTerm();
+ for (int i = 0; i < entries.size(); i++) {
+ final LogEntryProto entry = entries.get(i);
+ final long entryTerm = entry.getTerm();
+ Preconditions.assertTrue(entryTerm <= leaderTerm ,
+ "Unexpected Term: entries[%s].getTerm() == %s > leaderTerm == %s",
+ i, entryTerm, leaderTerm);
+
+ final long indexI = entry.getIndex();
+ final long expected = index0 + i;
+ Preconditions.assertTrue(indexI == expected,
+ "Unexpected Index: entries[0].getIndex() == %s but
entries[%s].getIndex() == %s != %s",
+ index0, i, indexI, expected);
+ }
+ }
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index f2be8c61c..e35cb2386 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -126,6 +126,10 @@ final class ServerProtoUtils {
.build();
}
+ static ReadIndexReplyProto toReadIndexReplyProto(RaftPeerId requestorId,
RaftGroupMemberId replyId) {
+ return toReadIndexReplyProto(requestorId, replyId, false,
RaftLog.INVALID_LOG_INDEX);
+ }
+
@SuppressWarnings("parameternumber")
static AppendEntriesReplyProto toAppendEntriesReplyProto(
RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 7aae944a4..3e5ac2b67 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -113,7 +113,7 @@ class SnapshotInstallationHandler {
CodeInjectionForTesting.execute(RaftServerImpl.INSTALL_SNAPSHOT,
server.getId(), leaderId, request);
server.assertLifeCycleState(LifeCycle.States.STARTING_OR_RUNNING);
- server.assertGroup(leaderId, leaderGroupId);
+ ServerImplUtils.assertGroup(getMemberId(), leaderId, leaderGroupId);
InstallSnapshotReplyProto reply = null;
// Check if install snapshot from Leader is enabled