This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new eb3feb5 RATIS-605. Change RaftServerImpl and proto to use
RaftGroupMemberId.
eb3feb5 is described below
commit eb3feb5af57a13173be58b9ba18c382ce4a5da5b
Author: Tsz Wo Nicholas Sze <[email protected]>
AuthorDate: Fri Jul 5 15:47:15 2019 +0800
RATIS-605. Change RaftServerImpl and proto to use RaftGroupMemberId.
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 29 ++---
.../ratis/protocol/LeaderNotReadyException.java | 14 +-
.../apache/ratis/protocol/NotLeaderException.java | 5 +-
.../org/apache/ratis/protocol/RaftClientReply.java | 7 +
.../ratis/protocol/StateMachineException.java | 7 +-
.../java/org/apache/ratis/util/ProtoUtils.java | 16 +++
ratis-proto/src/main/proto/Raft.proto | 7 +-
.../org/apache/ratis/server/impl/LeaderState.java | 6 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 144 ++++++++++-----------
.../org/apache/ratis/server/impl/RoleInfo.java | 2 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 34 ++---
.../org/apache/ratis/server/impl/ServerState.java | 41 +++---
.../ratis/server/impl/StateMachineUpdater.java | 6 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 4 +
14 files changed, 163 insertions(+), 159 deletions(-)
diff --git
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 38977bc..6492c23 100644
---
a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++
b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -156,7 +156,7 @@ public interface ClientProtoUtils {
if (reply.getMessage() != null) {
b.setMessage(toClientMessageEntryProtoBuilder(reply.getMessage()));
}
- ProtoUtils.addCommitInfos(reply.getCommitInfos(), i ->
b.addCommitInfos(i));
+ ProtoUtils.addCommitInfos(reply.getCommitInfos(), b::addCommitInfos);
final NotLeaderException nle = reply.getNotLeaderException();
final StateMachineException sme;
@@ -191,7 +191,7 @@ public interface ClientProtoUtils {
final LeaderNotReadyException lnre = reply.getLeaderNotReadyException();
if (lnre != null) {
LeaderNotReadyExceptionProto.Builder lnreBuilder =
LeaderNotReadyExceptionProto.newBuilder()
- .setRaftPeerId(lnre.getRaftPeerId().toByteString());
+
.setServerId(ProtoUtils.toRaftGroupMemberIdProtoBuilder(lnre.getServerId()));
b.setLeaderNotReadyException(lnreBuilder);
}
}
@@ -224,40 +224,38 @@ public interface ClientProtoUtils {
b.setGroup(ProtoUtils.toRaftGroupProtoBuilder(reply.getGroup()));
b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy());
b.setRole(reply.getRoleInfoProto());
- ProtoUtils.addCommitInfos(reply.getCommitInfos(), i ->
b.addCommitInfos(i));
+ ProtoUtils.addCommitInfos(reply.getCommitInfos(), b::addCommitInfos);
}
}
return b.build();
}
- static RaftClientReply toRaftClientReply(
- RaftClientReplyProto replyProto) {
+ static RaftClientReply toRaftClientReply(RaftClientReplyProto replyProto) {
final RaftRpcReplyProto rp = replyProto.getRpcReply();
+ final RaftGroupMemberId serverMemberId =
ProtoUtils.toRaftGroupMemberId(rp.getReplyId(), rp.getRaftGroupId());
+
final RaftException e;
if (replyProto.getExceptionDetailsCase().equals(NOTLEADEREXCEPTION)) {
NotLeaderExceptionProto nleProto = replyProto.getNotLeaderException();
final RaftPeer suggestedLeader = nleProto.hasSuggestedLeader() ?
ProtoUtils.toRaftPeer(nleProto.getSuggestedLeader()) : null;
final List<RaftPeer> peers =
ProtoUtils.toRaftPeers(nleProto.getPeersInConfList());
- e = new NotLeaderException(RaftPeerId.valueOf(rp.getReplyId()),
suggestedLeader, peers);
+ e = new NotLeaderException(serverMemberId, suggestedLeader, peers);
} else if (replyProto.getExceptionDetailsCase() == NOTREPLICATEDEXCEPTION)
{
final NotReplicatedExceptionProto nre =
replyProto.getNotReplicatedException();
e = new NotReplicatedException(nre.getCallId(), nre.getReplication(),
nre.getLogIndex());
} else if
(replyProto.getExceptionDetailsCase().equals(STATEMACHINEEXCEPTION)) {
StateMachineExceptionProto smeProto =
replyProto.getStateMachineException();
- e = wrapStateMachineException(RaftPeerId.valueOf(rp.getReplyId()),
- smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
- smeProto.getStacktrace());
+ e = wrapStateMachineException(serverMemberId,
+ smeProto.getExceptionClassName(), smeProto.getErrorMsg(),
smeProto.getStacktrace());
} else if
(replyProto.getExceptionDetailsCase().equals(LEADERNOTREADYEXCEPTION)) {
LeaderNotReadyExceptionProto lnreProto =
replyProto.getLeaderNotReadyException();
- e = new
LeaderNotReadyException(RaftPeerId.valueOf(lnreProto.getRaftPeerId()));
+ e = new
LeaderNotReadyException(ProtoUtils.toRaftGroupMemberId(lnreProto.getServerId()));
} else {
e = null;
}
ClientId clientId = ClientId.valueOf(rp.getRequestorId());
- final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId());
- return new RaftClientReply(clientId, RaftPeerId.valueOf(rp.getReplyId()),
- groupId, rp.getCallId(), rp.getSuccess(),
+ return new RaftClientReply(clientId, serverMemberId, rp.getCallId(),
rp.getSuccess(),
toMessage(replyProto.getMessage()), e,
replyProto.getLogIndex(), replyProto.getCommitInfosList());
}
@@ -288,8 +286,7 @@ public interface ClientProtoUtils {
}
static StateMachineException wrapStateMachineException(
- RaftPeerId serverId, String className, String errorMsg,
- ByteString stackTraceBytes) {
+ RaftGroupMemberId memberId, String className, String errorMsg,
ByteString stackTraceBytes) {
StateMachineException sme;
if (className == null) {
sme = new StateMachineException(errorMsg);
@@ -298,7 +295,7 @@ public interface ClientProtoUtils {
Class<?> clazz = Class.forName(className);
final Exception e = ReflectionUtils.instantiateException(
clazz.asSubclass(Exception.class), errorMsg, null);
- sme = new StateMachineException(serverId, e);
+ sme = new StateMachineException(memberId, e);
} catch (Exception e) {
sme = new StateMachineException(className + ": " + errorMsg);
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
index 2b81958..efe795f 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/LeaderNotReadyException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -23,15 +23,15 @@ package org.apache.ratis.protocol;
* log entry yet. Thus the leader cannot accept any new client requests since
* it cannot determine whether a request is just a retry.
*/
-public class LeaderNotReadyException extends RaftException {
- private final RaftPeerId raftPeerId;
+public class LeaderNotReadyException extends ServerNotReadyException {
+ private final RaftGroupMemberId serverId;
- public LeaderNotReadyException(RaftPeerId id) {
+ public LeaderNotReadyException(RaftGroupMemberId id) {
super(id + " is in LEADER state but not ready yet.");
- this.raftPeerId = id;
+ this.serverId = id;
}
- public RaftPeerId getRaftPeerId() {
- return raftPeerId;
+ public RaftGroupMemberId getServerId() {
+ return serverId;
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
index a854d8c..ab291b9 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/NotLeaderException.java
@@ -27,9 +27,8 @@ public class NotLeaderException extends RaftException {
/** the client may need to update its RaftPeer list */
private final Collection<RaftPeer> peers;
- public NotLeaderException(RaftPeerId id, RaftPeer suggestedLeader,
Collection<RaftPeer> peers) {
- super("Server " + id + " is not the leader (" + suggestedLeader
- + "). Request must be sent to leader.");
+ public NotLeaderException(RaftGroupMemberId memberId, RaftPeer
suggestedLeader, Collection<RaftPeer> peers) {
+ super("Server " + memberId + " is not the leader" + (suggestedLeader !=
null? " " + suggestedLeader: ""));
this.suggestedLeader = suggestedLeader;
this.peers = peers != null? Collections.unmodifiableCollection(peers):
Collections.emptyList();
Preconditions.assertUnique(this.peers);
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 33bbcd0..0b80ae2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -51,6 +51,13 @@ public class RaftClientReply extends RaftClientMessage {
/** The commit information when the reply is created. */
private final Collection<CommitInfoProto> commitInfos;
+ public RaftClientReply(ClientId clientId, RaftGroupMemberId serverId,
+ long callId, boolean success, Message message, RaftException exception,
+ long logIndex, Collection<CommitInfoProto> commitInfos) {
+ this(clientId, serverId.getPeerId(), serverId.getGroupId(),
+ callId, success, message, exception, logIndex, commitInfos);
+ }
+
public RaftClientReply(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
long callId, boolean success, Message message, RaftException exception,
diff --git
a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index a9710b2..2641b4a 100644
---
a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++
b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,11 @@
package org.apache.ratis.protocol;
public class StateMachineException extends RaftException {
+ public StateMachineException(RaftGroupMemberId serverId, Throwable cause) {
+ this(serverId.getPeerId(), cause);
+ }
+
+ // TODO: remove this constructor in RATIS-609
public StateMachineException(RaftPeerId serverId, Throwable cause) {
// cause.getMessage is added to this exception message as the exception
received through
// RPC call contains similar message but Simulated RPC doesn't. Adding the
message
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2b0b5aa..7422637 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -19,6 +19,7 @@ package org.apache.ratis.util;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftGroupIdProto;
+import org.apache.ratis.proto.RaftProtos.RaftGroupMemberIdProto;
import org.apache.ratis.proto.RaftProtos.RaftGroupProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
@@ -26,6 +27,7 @@ import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -112,6 +114,20 @@ public interface ProtoUtils {
.addAllPeers(toRaftPeerProtos(group.getPeers()));
}
+ static RaftGroupMemberId toRaftGroupMemberId(ByteString peerId,
RaftGroupIdProto groupId) {
+ return RaftGroupMemberId.valueOf(RaftPeerId.valueOf(peerId),
ProtoUtils.toRaftGroupId(groupId));
+ }
+
+ static RaftGroupMemberId toRaftGroupMemberId(RaftGroupMemberIdProto
memberId) {
+ return toRaftGroupMemberId(memberId.getPeerId(), memberId.getGroupId());
+ }
+
+ static RaftGroupMemberIdProto.Builder
toRaftGroupMemberIdProtoBuilder(RaftGroupMemberId memberId) {
+ return RaftGroupMemberIdProto.newBuilder()
+ .setPeerId(memberId.getPeerId().toByteString())
+ .setGroupId(toRaftGroupIdProtoBuilder(memberId.getGroupId()));
+ }
+
static CommitInfoProto toCommitInfoProto(RaftPeer peer, long commitIndex) {
return CommitInfoProto.newBuilder()
.setServer(peer.getRaftPeerProto())
diff --git a/ratis-proto/src/main/proto/Raft.proto
b/ratis-proto/src/main/proto/Raft.proto
index 45b7851..b75569e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -35,6 +35,11 @@ message RaftGroupProto {
repeated RaftPeerProto peers = 2;
}
+message RaftGroupMemberIdProto {
+ bytes peerId = 1;
+ RaftGroupIdProto groupId = 2;
+}
+
message RaftConfigurationProto {
repeated RaftPeerProto peers = 1; // the peers in the current or new conf
repeated RaftPeerProto oldPeers = 2; // the peers in the old conf
@@ -270,7 +275,7 @@ message NotLeaderExceptionProto {
}
message LeaderNotReadyExceptionProto {
- bytes raftPeerId = 1; // id of the peer
+ RaftGroupMemberIdProto serverId = 1; // id of the leader
}
message NotReplicatedExceptionProto {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 2271908..e61c7e4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -208,7 +208,7 @@ public class LeaderState {
this.watchRequests = new WatchRequests(server.getId(), properties);
final RaftConfiguration conf = server.getRaftConf();
- Collection<RaftPeer> others = conf.getOtherPeers(state.getSelfId());
+ Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
placeHolderIndex = raftLog.getNextIndex();
senders = new SenderList();
@@ -360,8 +360,8 @@ public class LeaderState {
AppendEntriesRequestProto newAppendEntriesRequestProto(RaftPeerId targetId,
TermIndex previous, List<LogEntryProto> entries, boolean initializing,
long callId) {
- return ServerProtoUtils.toAppendEntriesRequestProto(server.getId(),
targetId,
- server.getGroupId(), currentTerm, entries,
raftLog.getLastCommittedIndex(),
+ return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(),
targetId,
+ currentTerm, entries, raftLog.getLastCommittedIndex(),
initializing, previous, server.getCommitInfos(), callId);
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b819e29..d93bcec 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -77,8 +77,6 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
private final LifeCycle lifeCycle;
private final ServerState state;
- private final RaftGroupId groupId;
- private final RaftGroupMemberId memberId; // TODO: move it to ServerState;
see RATIS-605
private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() -> new
RaftPeer(getId(), getServerRpc().getInetSocketAddress()));
private final RoleInfo role;
@@ -92,8 +90,6 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy
proxy) throws IOException {
final RaftPeerId id = proxy.getId();
LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
- this.groupId = group.getGroupId();
- this.memberId = RaftGroupMemberId.valueOf(id, groupId);
this.lifeCycle = new LifeCycle(id);
this.stateMachine = stateMachine;
this.role = new RoleInfo(id);
@@ -149,7 +145,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
public RaftGroupId getGroupId() {
- return groupId;
+ return getMemberId().getGroupId();
}
public StateMachine getStateMachine() {
@@ -170,8 +166,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
private void setRole(RaftPeerRole newRole, Object reason) {
- LOG.info("{}:{} changes role from {} to {} at term {} for {}",
- getId(), getGroupId(), this.role, newRole, state.getCurrentTerm(),
reason);
+ LOG.info("{}: changes role from {} to {} at term {} for {}",
+ getMemberId(), this.role, newRole, state.getCurrentTerm(), reason);
this.role.transitionRole(newRole);
}
@@ -179,13 +175,12 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
if (!lifeCycle.compareAndTransition(NEW, STARTING)) {
return false;
}
- LOG.info("{}: start {}", getId(), groupId);
RaftConfiguration conf = getRaftConf();
if (conf != null && conf.contains(getId())) {
- LOG.debug("{} starts as a follower, conf={}", getId(), conf);
+ LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
startAsFollower();
} else {
- LOG.debug("{} starts with initializing state, conf={}", getId(), conf);
+ LOG.info("{}: start with initializing state, conf={}", getMemberId(),
conf);
startInitializing();
}
@@ -227,11 +222,11 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
public RaftGroupMemberId getMemberId() {
- return memberId;
+ return getState().getMemberId();
}
public RaftPeerId getId() {
- return getState().getSelfId();
+ return getMemberId().getPeerId();
}
RoleInfo getRole() {
@@ -243,43 +238,43 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
RaftGroup getGroup() {
- return RaftGroup.valueOf(groupId, getRaftConf().getPeers());
+ return RaftGroup.valueOf(getGroupId(), getRaftConf().getPeers());
}
public void shutdown(boolean deleteDirectory) {
lifeCycle.checkStateAndClose(() -> {
- LOG.info("{}: shutdown {}", getId(), groupId);
+ LOG.info("{}: shutdown", getMemberId());
try {
jmxAdapter.unregister();
} catch (Exception ignored) {
- LOG.warn("Failed to un-register RaftServer JMX bean for " + getId(),
ignored);
+ LOG.warn("{}: Failed to un-register RaftServer JMX bean",
getMemberId(), ignored);
}
try {
role.shutdownFollowerState();
} catch (Exception ignored) {
- LOG.warn("Failed to shutdown FollowerState for " + getId(), ignored);
+ LOG.warn("{}: Failed to shutdown FollowerState", getMemberId(),
ignored);
}
try{
role.shutdownLeaderElection();
} catch (Exception ignored) {
- LOG.warn("Failed to shutdown LeaderElection for " + getId(), ignored);
+ LOG.warn("{}: Failed to shutdown LeaderElection", getMemberId(),
ignored);
}
try{
role.shutdownLeaderState(true);
} catch (Exception ignored) {
- LOG.warn("Failed to shutdown LeaderState monitor for " + getId(),
ignored);
+ LOG.warn("{}: Failed to shutdown LeaderState monitor", getMemberId(),
ignored);
}
try{
state.close();
} catch (Exception ignored) {
- LOG.warn("Failed to close state for " + getId(), ignored);
+ LOG.warn("{}: Failed to close state", getMemberId(), ignored);
}
if (deleteDirectory) {
final RaftStorageDirectory dir = state.getStorage().getStorageDir();
try {
FileUtils.deleteFully(dir.getRoot());
} catch(Exception ignored) {
- LOG.warn(getId() + ": Failed to remove RaftStorageDirectory " + dir,
ignored);
+ LOG.warn("{}: Failed to remove RaftStorageDirectory {}",
getMemberId(), dir, ignored);
}
}
});
@@ -353,10 +348,10 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
} else {
getRaftConf().getPeers().stream()
- .filter(p -> !p.getId().equals(state.getSelfId()))
.map(RaftPeer::getId)
+ .filter(id -> !id.equals(getId()))
.map(commitInfoCache::get)
- .filter(i -> i != null)
+ .filter(Objects::nonNull)
.forEach(infos::add);
}
return infos;
@@ -419,8 +414,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
@Override
public String toString() {
- return String.format("%8s ", role) + groupId + " " + state
- + " " + lifeCycle.getCurrentState();
+ return role + " " + state + " " + lifeCycle.getCurrentState();
}
/**
@@ -445,8 +439,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
return cacheEntry.getReplyFuture();
}
- final RaftClientReply reply = new RaftClientReply(request,
- new LeaderNotReadyException(getId()), getCommitInfos());
+ final LeaderNotReadyException lnre = new
LeaderNotReadyException(getMemberId());
+ final RaftClientReply reply = new RaftClientReply(request, lnre,
getCommitInfos());
return RetryCache.failWithReply(reply, entry);
}
return null;
@@ -454,28 +448,29 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
NotLeaderException generateNotLeaderException() {
if (lifeCycle.getCurrentState() != RUNNING) {
- return new NotLeaderException(getId(), null, null);
+ return new NotLeaderException(getMemberId(), null, null);
}
RaftPeerId leaderId = state.getLeaderId();
- if (leaderId == null || leaderId.equals(state.getSelfId())) {
+ if (leaderId == null || leaderId.equals(getId())) {
// No idea about who is the current leader. Or the peer is the current
// leader, but it is about to step down. set the suggested leader as
null.
leaderId = null;
}
RaftConfiguration conf = getRaftConf();
Collection<RaftPeer> peers = conf.getPeers();
- return new NotLeaderException(getId(), conf.getPeer(leaderId), peers);
+ return new NotLeaderException(getMemberId(), conf.getPeer(leaderId),
peers);
}
private LifeCycle.State assertLifeCycleState(LifeCycle.State... expected)
throws ServerNotReadyException {
- return lifeCycle.assertCurrentState((n, c) -> new
ServerNotReadyException("Server " + n
- + " is not " + Arrays.toString(expected) + ": current state is " + c),
+ return lifeCycle.assertCurrentState((n, c) -> new ServerNotReadyException(
+ getMemberId() + " is not in " + Arrays.toString(expected) + ": current
state is " + c),
expected);
}
void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws
GroupMismatchException {
+ final RaftGroupId groupId = getGroupId();
if (!groupId.equals(requestorGroupId)) {
- throw new GroupMismatchException(getId()
+ throw new GroupMismatchException(getMemberId()
+ ": The group (" + requestorGroupId + ") of " + requestorId
+ " does not match the group (" + groupId + ") of the server " +
getId());
}
@@ -502,7 +497,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final PendingRequests.Permit permit =
leaderState.tryAcquirePendingRequest();
if (permit == null) {
return JavaUtils.completeExceptionally(new
ResourceUnavailableException(
- "Failed to acquire a pending write request in " + getId() + " for
" + request));
+ getMemberId() + ": Failed to acquire a pending write request for "
+ request));
}
try {
state.appendLog(context);
@@ -522,7 +517,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
pending = leaderState.addPendingRequest(permit, request, context);
if (pending == null) {
return JavaUtils.completeExceptionally(new
ResourceUnavailableException(
- "Failed to add a pending write request in " + getId() + " for " +
request));
+ getMemberId() + ": Failed to add a pending write request for " +
request));
}
leaderState.notifySenders();
}
@@ -533,7 +528,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
public CompletableFuture<RaftClientReply> submitClientRequestAsync(
RaftClientRequest request) throws IOException {
assertLifeCycleState(RUNNING);
- LOG.debug("{}: receive client request({})", getId(), request);
+ LOG.debug("{}: receive client request({})", getMemberId(), request);
if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
return staleReadAsync(request);
}
@@ -572,7 +567,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
TransactionContext context = stateMachine.startTransaction(request);
if (context.getException() != null) {
RaftClientReply exceptionReply = new RaftClientReply(request,
- new StateMachineException(getId(), context.getException()),
getCommitInfos());
+ new StateMachineException(getMemberId(), context.getException()),
getCommitInfos());
cacheEntry.failWithReply(exceptionReply);
return CompletableFuture.completedFuture(exceptionReply);
}
@@ -589,12 +584,12 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest
request) {
final long minIndex = request.getType().getStaleRead().getMinIndex();
final long commitIndex = state.getLog().getLastCommittedIndex();
- LOG.debug("{}: minIndex={}, commitIndex={}", getId(), minIndex,
commitIndex);
+ LOG.debug("{}: minIndex={}, commitIndex={}", getMemberId(), minIndex,
commitIndex);
if (commitIndex < minIndex) {
final StaleReadException e = new StaleReadException(
"Unable to serve stale-read due to server commit index = " +
commitIndex + " < min = " + minIndex);
return CompletableFuture.completedFuture(
- new RaftClientReply(request, new StateMachineException(getId(), e),
getCommitInfos()));
+ new RaftClientReply(request, new
StateMachineException(getMemberId(), e), getCommitInfos()));
}
return
processQueryFuture(getStateMachine().queryStale(request.getMessage(),
minIndex), request);
}
@@ -614,17 +609,16 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
@Override
public RaftClientReply submitClientRequest(RaftClientRequest request)
throws IOException {
- return waitForReply(getId(), request, submitClientRequestAsync(request));
+ return waitForReply(request, submitClientRequestAsync(request));
}
- RaftClientReply waitForReply(RaftPeerId id,
- RaftClientRequest request, CompletableFuture<RaftClientReply> future)
+ RaftClientReply waitForReply(RaftClientRequest request,
CompletableFuture<RaftClientReply> future)
throws IOException {
- return waitForReply(id, request, future, e -> new RaftClientReply(request,
e, getCommitInfos()));
+ return waitForReply(getMemberId(), request, future, e -> new
RaftClientReply(request, e, getCommitInfos()));
}
static <REPLY extends RaftClientReply> REPLY waitForReply(
- RaftPeerId id, RaftClientRequest request, CompletableFuture<REPLY>
future,
+ Object id, RaftClientRequest request, CompletableFuture<REPLY> future,
Function<RaftException, REPLY> exceptionReply)
throws IOException {
try {
@@ -650,18 +644,16 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
}
@Override
- public RaftClientReply setConfiguration(SetConfigurationRequest request)
- throws IOException {
- return waitForReply(getId(), request, setConfigurationAsync(request));
+ public RaftClientReply setConfiguration(SetConfigurationRequest request)
throws IOException {
+ return waitForReply(request, setConfigurationAsync(request));
}
/**
* Handle a raft configuration change request from client.
*/
@Override
- public CompletableFuture<RaftClientReply> setConfigurationAsync(
- SetConfigurationRequest request) throws IOException {
- LOG.debug("{}: receive setConfiguration({})", getId(), request);
+ public CompletableFuture<RaftClientReply>
setConfigurationAsync(SetConfigurationRequest request) throws IOException {
+ LOG.info("{}: receive setConfiguration {}", getMemberId(), request);
assertLifeCycleState(RUNNING);
assertGroup(request.getRequestorId(), request.getRaftGroupId());
@@ -747,7 +739,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
candidateId, candidateTerm, candidateLastEntry);
LOG.debug("{}: receive requestVote({}, {}, {}, {})",
- getId(), candidateId, candidateGroupId, candidateTerm,
candidateLastEntry);
+ getMemberId(), candidateId, candidateGroupId, candidateTerm,
candidateLastEntry);
assertLifeCycleState(RUNNING);
assertGroup(candidateId, candidateGroupId);
@@ -758,7 +750,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final FollowerState fs = role.getFollowerState().orElse(null);
if (shouldWithholdVotes(candidateTerm)) {
LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State:
leader={}, term={}, lastRpcElapsed={}",
- getId(), role, candidateId, candidateTerm, state.getLeaderId(),
state.getCurrentTerm(),
+ getMemberId(), role, candidateId, candidateTerm,
state.getLeaderId(), state.getCurrentTerm(),
fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
} else if (state.recognizeCandidate(candidateId, candidateTerm)) {
final boolean termUpdated = changeToFollower(candidateTerm, true,
"recognizeCandidate:" + candidateId);
@@ -775,11 +767,11 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
if (!voteGranted && shouldSendShutdown(candidateId, candidateLastEntry))
{
shouldShutdown = true;
}
- reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getId(),
- groupId, voteGranted, state.getCurrentTerm(), shouldShutdown);
+ reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId,
getMemberId(),
+ voteGranted, state.getCurrentTerm(), shouldShutdown);
if (LOG.isDebugEnabled()) {
LOG.debug("{} replies to vote request: {}. Peer's state: {}",
- getId(), ServerProtoUtils.toString(reply), state);
+ getMemberId(), ServerProtoUtils.toString(reply), state);
}
}
return reply;
@@ -841,7 +833,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
return appendEntriesAsync(requestorId, r.getLeaderTerm(), previous,
r.getLeaderCommit(),
request.getCallId(), r.getInitializing(), r.getCommitInfosList(),
entries);
} catch(Throwable t) {
- LOG.error(getId() + ": Failed appendEntriesAsync " + r, t);
+ LOG.error("{}: Failed appendEntriesAsync {}", getMemberId(), r, t);
throw t;
}
}
@@ -876,7 +868,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final LifeCycle.State currentState = assertLifeCycleState(STARTING,
RUNNING);
if (currentState == STARTING) {
if (role.getCurrentRole() == null) {
- throw new ServerNotReadyException("The role of Server " + getId() + "
is not yet initialized.");
+ throw new ServerNotReadyException(getMemberId() + ": The server role
is not yet initialized.");
}
}
assertGroup(leaderId, leaderGroupId);
@@ -893,7 +885,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
List<CommitInfoProto> commitInfos, LogEntryProto... entries) {
final boolean isHeartbeat = entries.length == 0;
logAppendEntries(isHeartbeat,
- () -> getId() + ": receive appendEntries(" + leaderId + ", " +
leaderTerm + ", "
+ () -> getMemberId() + ": receive appendEntries(" + leaderId + ", " +
leaderTerm + ", "
+ previous + ", " + leaderCommit + ", " + initializing
+ ", commits" + ProtoUtils.toString(commitInfos)
+ ", entries: " + ServerProtoUtils.toString(entries));
@@ -907,10 +899,10 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
currentTerm = state.getCurrentTerm();
if (!recognized) {
final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), groupId, currentTerm, followerCommit,
state.getNextIndex(), NOT_LEADER, callId);
+ leaderId, getMemberId(), currentTerm, followerCommit,
state.getNextIndex(), NOT_LEADER, callId);
if (LOG.isDebugEnabled()) {
LOG.debug("{}: Not recognize {} (term={}) as leader, state: {}
reply: {}",
- getId(), leaderId, leaderTerm, state,
ServerProtoUtils.toString(reply));
+ getMemberId(), leaderId, leaderTerm, state,
ServerProtoUtils.toString(reply));
}
return CompletableFuture.completedFuture(reply);
}
@@ -957,11 +949,11 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
synchronized(this) {
state.updateStatemachine(leaderCommit, currentTerm);
final long n = isHeartbeat? state.getLog().getNextIndex():
entries[entries.length - 1].getIndex() + 1;
- reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId, getId(),
groupId, currentTerm,
+ reply = ServerProtoUtils.toAppendEntriesReplyProto(leaderId,
getMemberId(), currentTerm,
state.getLog().getLastCommittedIndex(), n, SUCCESS, callId);
}
logAppendEntries(isHeartbeat, () ->
- getId() + ": succeeded to handle AppendEntries. Reply: " +
ServerProtoUtils.toString(reply));
+ getMemberId() + ": succeeded to handle AppendEntries. Reply: " +
ServerProtoUtils.toString(reply));
return reply;
});
}
@@ -974,8 +966,8 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
}
final AppendEntriesReplyProto reply =
ServerProtoUtils.toAppendEntriesReplyProto(
- leaderId, getId(), groupId, currentTerm, followerCommit,
replyNextIndex, INCONSISTENCY, callId);
- LOG.info("{}: inconsistency entries. Reply:{}", getId(),
ServerProtoUtils.toString(reply));
+ leaderId, getMemberId(), currentTerm, followerCommit, replyNextIndex,
INCONSISTENCY, callId);
+ LOG.info("{}: inconsistency entries. Reply:{}", getMemberId(),
ServerProtoUtils.toString(reply));
return reply;
}
@@ -983,7 +975,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// Check if a snapshot installation through state machine is in progress.
final TermIndex installSnapshot = inProgressInstallSnapshotRequest.get();
if (installSnapshot != null) {
- LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in
progress", getId(), installSnapshot);
+ LOG.info("{}: Failed appendEntries as snapshot ({}) installation is in
progress", getMemberId(), installSnapshot);
return installSnapshot.getIndex();
}
@@ -994,7 +986,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
final long snapshotIndex = state.getSnapshotIndex();
if (snapshotIndex > 0 && snapshotIndex >= firstEntryIndex) {
LOG.info("{}: Failed appendEntries as latest snapshot ({}) already has
the append entries (first index: {})",
- getId(), snapshotIndex, firstEntryIndex);
+ getMemberId(), snapshotIndex, firstEntryIndex);
return snapshotIndex + 1;
}
}
@@ -1002,7 +994,7 @@ public class RaftServerImpl implements RaftServerProtocol,
RaftServerAsynchronou
// Check if "previous" is contained in current state.
if (previous != null && !state.containsTermIndex(previous)) {
final long replyNextIndex = Math.min(state.getNextIndex(),
previous.getIndex());
- LOG.info("{}: Failed appendEntries as previous log entry ({}) is not
found", getId(), previous);
+ LOG.info("{}: Failed appendEntries as previous log entry ({}) is not
found", getMemberId(), previous);
return replyNextIndex;
}
@@ -1085,7 +1077,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
Preconditions.assertTrue(
state.getLog().getNextIndex() <= lastIncludedIndex,
"%s log's next id is %s, last included index in snapshot is %s",
- getId(), state.getLog().getNextIndex(), lastIncludedIndex);
+ getMemberId(), state.getLog().getNextIndex(), lastIncludedIndex);
//TODO: We should only update State with installed snapshot once the
request is done.
state.installSnapshot(request);
@@ -1197,8 +1189,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
synchronized RequestVoteRequestProto createRequestVoteRequest(
RaftPeerId targetId, long term, TermIndex lastEntry) {
- return ServerProtoUtils.toRequestVoteRequestProto(getId(), targetId,
- groupId, term, lastEntry);
+ return ServerProtoUtils.toRequestVoteRequestProto(getMemberId(), targetId,
term, lastEntry);
}
public void submitUpdateCommitEvent() {
@@ -1219,7 +1210,6 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
// update the retry cache
final ClientId clientId = ClientId.valueOf(smLog.getClientId());
final long callId = smLog.getCallId();
- final RaftPeerId serverId = getId();
final RetryCache.CacheEntry cacheEntry =
retryCache.getOrCreateEntry(clientId, callId);
if (cacheEntry.isFailed()) {
retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey()));
@@ -1229,12 +1219,12 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
return stateMachineFuture.whenComplete((reply, exception) -> {
final RaftClientReply r;
if (exception == null) {
- r = new RaftClientReply(clientId, serverId, groupId, callId, true,
reply, null, logIndex, getCommitInfos());
+ r = new RaftClientReply(clientId, getMemberId(), callId, true, reply,
null, logIndex, getCommitInfos());
} else {
// the exception is coming from the state machine. wrap it into the
// reply as a StateMachineException
- final StateMachineException e = new StateMachineException(getId(),
exception);
- r = new RaftClientReply(clientId, serverId, groupId, callId, false,
null, e, logIndex, getCommitInfos());
+ final StateMachineException e = new
StateMachineException(getMemberId(), exception);
+ r = new RaftClientReply(clientId, getMemberId(), callId, false, null,
e, logIndex, getCommitInfos());
}
// update pending request
@@ -1283,8 +1273,8 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
stateMachine.applyTransaction(trx);
return replyPendingRequest(next, stateMachineFuture);
} catch (Throwable e) {
- LOG.error("{}: applyTransaction failed for index:{} proto:{}", getId(),
- next.getIndex(), ServerProtoUtils.toString(next), e.getMessage());
+ LOG.error("{}: applyTransaction failed for index:{} proto:{}",
+ getMemberId(), next.getIndex(), ServerProtoUtils.toString(next),
e);
throw e;
}
}
@@ -1298,7 +1288,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
final long callId = smLog.getCallId();
final RetryCache.CacheEntry cacheEntry = getRetryCache().get(clientId,
callId);
if (cacheEntry != null) {
- final RaftClientReply reply = new RaftClientReply(clientId, getId(),
getGroupId(),
+ final RaftClientReply reply = new RaftClientReply(clientId,
getMemberId(),
callId, false, null, generateNotLeaderException(),
logEntry.getIndex(), getCommitInfos());
cacheEntry.failWithReply(reply);
@@ -1309,7 +1299,7 @@ public class RaftServerImpl implements
RaftServerProtocol, RaftServerAsynchronou
private class RaftServerJmxAdapter extends JmxRegister implements
RaftServerMXBean {
@Override
public String getId() {
- return getState().getSelfId().toString();
+ return getMemberId().getPeerId().toString();
}
@Override
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index a27a086..ae2a0df 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -146,6 +146,6 @@ class RoleInfo {
@Override
public String toString() {
- return "" + role;
+ return String.format("%9s", role);
}
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 5d94929..e688446 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -22,7 +22,6 @@ import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
@@ -290,20 +289,14 @@ public interface ServerProtoUtils {
static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success) {
- return toRaftRpcReplyProtoBuilder(requestorId, replyId.getPeerId(),
replyId.getGroupId(), success);
- }
-
- static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
- RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, boolean
success) {
return ClientProtoUtils.toRaftRpcReplyProtoBuilder(
- requestorId.toByteString(), replyId.toByteString(), groupId,
DEFAULT_CALLID, success);
+ requestorId.toByteString(), replyId.getPeerId().toByteString(),
replyId.getGroupId(), DEFAULT_CALLID, success);
}
static RequestVoteReplyProto toRequestVoteReplyProto(
- RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId,
- boolean success, long term, boolean shouldShutdown) {
+ RaftPeerId requestorId, RaftGroupMemberId replyId, boolean success, long
term, boolean shouldShutdown) {
return RequestVoteReplyProto.newBuilder()
- .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId,
groupId, success))
+ .setServerReply(toRaftRpcReplyProtoBuilder(requestorId, replyId,
success))
.setTerm(term)
.setShouldShutdown(shouldShutdown)
.build();
@@ -311,19 +304,14 @@ public interface ServerProtoUtils {
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
RaftGroupMemberId requestorId, RaftPeerId replyId) {
- return toRaftRpcRequestProtoBuilder(requestorId.getPeerId(), replyId,
requestorId.getGroupId());
- }
-
- static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) {
return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
- requestorId.toByteString(), replyId.toByteString(), groupId,
DEFAULT_CALLID, null);
+ requestorId.getPeerId().toByteString(), replyId.toByteString(),
requestorId.getGroupId(), DEFAULT_CALLID, null);
}
static RequestVoteRequestProto toRequestVoteRequestProto(
- RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long
term, TermIndex lastEntry) {
+ RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex
lastEntry) {
final RequestVoteRequestProto.Builder b =
RequestVoteRequestProto.newBuilder()
- .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId,
groupId))
+ .setServerRequest(toRaftRpcRequestProtoBuilder(requestorId, replyId))
.setCandidateTerm(term);
if (lastEntry != null) {
b.setCandidateLastEntry(toTermIndexProto(lastEntry));
@@ -399,10 +387,10 @@ public interface ServerProtoUtils {
}
static AppendEntriesReplyProto toAppendEntriesReplyProto(
- RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long
term,
+ RaftPeerId requestorId, RaftGroupMemberId replyId, long term,
long followerCommit, long nextIndex, AppendResult result, long callId) {
RaftRpcReplyProto.Builder rpcReply = toRaftRpcReplyProtoBuilder(
- requestorId, replyId, groupId, result == AppendResult.SUCCESS)
+ requestorId, replyId, result == AppendResult.SUCCESS)
.setCallId(callId);
return AppendEntriesReplyProto.newBuilder()
.setServerReply(rpcReply)
@@ -413,10 +401,10 @@ public interface ServerProtoUtils {
}
static AppendEntriesRequestProto toAppendEntriesRequestProto(
- RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId, long
leaderTerm,
+ RaftGroupMemberId requestorId, RaftPeerId replyId, long leaderTerm,
List<LogEntryProto> entries, long leaderCommit, boolean initializing,
TermIndex previous, Collection<CommitInfoProto> commitInfos, long
callId) {
- RaftRpcRequestProto.Builder rpcRequest =
toRaftRpcRequestProtoBuilder(requestorId, replyId, groupId)
+ RaftRpcRequestProto.Builder rpcRequest =
toRaftRpcRequestProtoBuilder(requestorId, replyId)
.setCallId(callId);
final AppendEntriesRequestProto.Builder b = AppendEntriesRequestProto
.newBuilder()
@@ -431,7 +419,7 @@ public interface ServerProtoUtils {
if (previous != null) {
b.setPreviousLog(toTermIndexProto(previous));
}
- ProtoUtils.addCommitInfos(commitInfos, i -> b.addCommitInfos(i));
+ ProtoUtils.addCommitInfos(commitInfos, b::addCommitInfos);
return b.build();
}
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index b40a131..7fbbc66 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -54,8 +54,7 @@ import static org.apache.ratis.server.impl.RaftServerImpl.LOG;
* Common states of a raft peer. Protected by RaftServer's lock.
*/
public class ServerState implements Closeable {
- private final RaftPeerId selfId;
- private final RaftGroupId groupId;
+ private final RaftGroupMemberId memberId;
private final RaftServerImpl server;
/** Raft log */
private final RaftLog log;
@@ -96,13 +95,12 @@ public class ServerState implements Closeable {
ServerState(RaftPeerId id, RaftGroup group, RaftProperties prop,
RaftServerImpl server, StateMachine stateMachine)
throws IOException {
- this.selfId = id;
- this.groupId = group.getGroupId();
+ this.memberId = RaftGroupMemberId.valueOf(id, group.getGroupId());
this.server = server;
RaftConfiguration initialConf = RaftConfiguration.newBuilder()
.setConf(group.getPeers()).build();
configurationManager = new ConfigurationManager(initialConf);
- LOG.info("{}:{} {}", id, groupId, configurationManager);
+ LOG.info("{}: {}", getMemberId(), configurationManager);
// use full uuid string to create a subdirectory
final File dir = chooseStorageDir(RaftServerConfigKeys.storageDirs(prop),
@@ -126,10 +124,12 @@ public class ServerState implements Closeable {
currentTerm.set(metadata.getTerm());
votedFor = metadata.getVotedFor();
- stateMachineUpdater = new StateMachineUpdater(stateMachine, server, log,
- lastApplied, prop);
+ stateMachineUpdater = new StateMachineUpdater(stateMachine, server, this,
lastApplied, prop);
}
+ RaftGroupMemberId getMemberId() {
+ return memberId;
+ }
static File chooseStorageDir(List<File> volumes, String targetSubDir) throws
IOException {
final Map<File, Integer> numberOfStorageDirPerVolume = new HashMap<>();
@@ -161,7 +161,7 @@ public class ServerState implements Closeable {
SnapshotInfo snapshot = sm.getLatestSnapshot();
if (snapshot == null || snapshot.getTermIndex().getIndex() < 0) {
- return RaftServerConstants.INVALID_LOG_INDEX;
+ return RaftLog.INVALID_LOG_INDEX;
}
// get the raft configuration from raft metafile
@@ -201,10 +201,6 @@ public class ServerState implements Closeable {
return configurationManager.getCurrent();
}
- public RaftPeerId getSelfId() {
- return this.selfId;
- }
-
public long getCurrentTerm() {
return currentTerm.get();
}
@@ -231,7 +227,7 @@ public class ServerState implements Closeable {
* Become a candidate and start leader election
*/
long initElection() {
- votedFor = selfId;
+ votedFor = getMemberId().getPeerId();
setLeader(null, "initElection");
return currentTerm.incrementAndGet();
}
@@ -260,8 +256,8 @@ public class ServerState implements Closeable {
lastNoLeaderTime = null;
suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms";
}
- LOG.info("{}:{} change Leader from {} to {} at term {} for {}{}",
- selfId, groupId, leaderId, newLeaderId, getCurrentTerm(), op,
suffix);
+ LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
+ getMemberId(), leaderId, newLeaderId, getCurrentTerm(), op, suffix);
leaderId = newLeaderId;
}
}
@@ -276,7 +272,7 @@ public class ServerState implements Closeable {
}
void becomeLeader() {
- setLeader(selfId, "becomeLeader");
+ setLeader(getMemberId().getPeerId(), "becomeLeader");
}
public RaftLog getLog() {
@@ -340,7 +336,7 @@ public class ServerState implements Closeable {
@Override
public String toString() {
- return selfId + ":t" + currentTerm + ", leader=" + leaderId
+ return getMemberId() + ":t" + currentTerm + ", leader=" + leaderId
+ ", voted=" + votedFor + ", raftlog=" + log + ", conf=" +
getRaftConf();
}
@@ -358,9 +354,8 @@ public class ServerState implements Closeable {
void setRaftConf(long logIndex, RaftConfiguration conf) {
configurationManager.addConfiguration(logIndex, conf);
server.getServerRpc().addPeers(conf.getPeers());
- LOG.info("{}:{} set configuration {} at {}", getSelfId(), groupId, conf,
- logIndex);
- LOG.trace("{}: {}", getSelfId(), configurationManager);
+ LOG.info("{}: set configuration {} at {}", getMemberId(), conf, logIndex);
+ LOG.trace("{}: {}", getMemberId(), configurationManager);
}
void updateConfiguration(LogEntryProto[] entries) {
@@ -388,11 +383,9 @@ public class ServerState implements Closeable {
try {
stateMachineUpdater.stopAndJoin();
} catch (InterruptedException e) {
- LOG.warn(getSelfId() +
- ": Interrupted when joining stateMachineUpdater", e);
+ LOG.warn("{}: Interrupted when joining stateMachineUpdater",
getMemberId(), e);
}
- LOG.info("{}:{} closes. The last applied log index is {}",
- getSelfId(), groupId, getLastAppliedIndex());
+ LOG.info("{}: closes. applyIndex: {}", getMemberId(),
getLastAppliedIndex());
log.close();
storage.close();
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 699d8e0..f0a4c58 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -75,14 +75,14 @@ class StateMachineUpdater implements Runnable {
private volatile State state = State.RUNNING;
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
- RaftLog raftLog, long lastAppliedIndex, RaftProperties properties) {
- this.name = getClass().getSimpleName() + ":" + raftLog.getSelfId() + ":" +
server.getGroupId();
+ ServerState serverState, long lastAppliedIndex, RaftProperties
properties) {
+ this.name = serverState.getMemberId() + "-" + getClass().getSimpleName();
this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
this.stateMachine = stateMachine;
this.server = server;
- this.raftLog = raftLog;
+ this.raftLog = serverState.getLog();
this.appliedIndex = new RaftLogIndex("appliedIndex", lastAppliedIndex);
this.snapshotIndex = new RaftLogIndex("snapshotIndex", lastAppliedIndex);
diff --git
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 353dffb..b209da9 100644
---
a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++
b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -21,6 +21,8 @@ import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -423,6 +425,8 @@ public class TestSegmentedRaftLog extends BaseTest {
RaftServerImpl server = mock(RaftServerImpl.class);
RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
when(server.getRetryCache()).thenReturn(retryCache);
+ final RaftGroupMemberId id =
RaftGroupMemberId.valueOf(RaftPeerId.valueOf("s0"), RaftGroupId.randomId());
+ when(server.getMemberId()).thenReturn(id);
doCallRealMethod().when(server).failClientRequest(any(LogEntryProto.class));
try (SegmentedRaftLog raftLog =
new SegmentedRaftLog(peerId, server, storage, -1, properties)) {