Repository: incubator-ratis Updated Branches: refs/heads/master a4fd89473 -> c2423179b
RATIS-286. Add information about raft peers and rpc delay in ServerInformationReply. Contributed by Mukul Kumar Singh. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/c2423179 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/c2423179 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/c2423179 Branch: refs/heads/master Commit: c2423179b8390a7ece679465e50b9abd47b4686d Parents: a4fd894 Author: Mukul Kumar Singh <[email protected]> Authored: Wed Aug 1 07:37:54 2018 +0530 Committer: Mukul Kumar Singh <[email protected]> Committed: Wed Aug 1 07:37:54 2018 +0530 ---------------------------------------------------------------------- .../ratis/client/impl/ClientProtoUtils.java | 7 +-- .../ratis/protocol/ServerInformationReply.java | 24 +++------ ratis-proto-shaded/src/main/proto/Raft.proto | 37 ++++++++++++-- .../apache/ratis/server/impl/FollowerState.java | 4 ++ .../ratis/server/impl/RaftServerImpl.java | 53 ++++++++++++++++++-- 5 files changed, 99 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c2423179/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index 33351c6..1c41a3b 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -188,6 +188,8 @@ public interface ClientProtoUtils { if (reply.getRaftGroupId() != null) { b.setGroup(ProtoUtils.toRaftGroupProtoBuilder(reply.getGroup())); } + b.setIsRaftStorageHealthy(reply.isRaftStorageHealthy()); + b.setRole(reply.getRoleInfoProto()); ProtoUtils.addCommitInfos(reply.getCommitInfos(), i -> b.addCommitInfos(i)); } return b.build(); @@ -230,11 +232,10 @@ public interface ClientProtoUtils { ClientId clientId = ClientId.valueOf(rp.getRequestorId()); final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId()); final RaftGroup raftGroup = ProtoUtils.toRaftGroup(replyProto.getGroup()); - RaftPeerRole role = replyProto.getRole(); + RoleInfoProto role = replyProto.getRole(); boolean isRaftStorageHealthy = replyProto.getIsRaftStorageHealthy(); - long roleElapsedTime = replyProto.getRoleElapsedTimeMs(); return new ServerInformationReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), - groupId, rp.getCallId(), rp.getSuccess(), role, roleElapsedTime, isRaftStorageHealthy, + groupId, rp.getCallId(), rp.getSuccess(), role, isRaftStorageHealthy, replyProto.getCommitInfosList(), raftGroup); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c2423179/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java index 9c4eaa8..feb326e 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java @@ -17,7 +17,7 @@ */ package org.apache.ratis.protocol; -import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto; import java.util.Collection; @@ -27,27 +27,24 @@ import java.util.Collection; */ public class ServerInformationReply extends RaftClientReply { private final RaftGroup group; - private final RaftProtos.RaftPeerRole role; - private final long roleElapsedTime; + private final RoleInfoProto roleInfoProto; private final boolean isRaftStorageHealthy; public ServerInformationReply( - RaftClientRequest request, RaftProtos.RaftPeerRole role, long roleElapsedTime, + RaftClientRequest request, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { super(request, commitInfos); - this.role = role; - this.roleElapsedTime = roleElapsedTime; + this.roleInfoProto = roleInfoProto; this.isRaftStorageHealthy = isRaftStorageHealthy; this.group = group; } public ServerInformationReply( ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, - long callId, boolean success, RaftProtos.RaftPeerRole role, long roleElapsedTime, + long callId, boolean success, RoleInfoProto roleInfoProto, boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { super(clientId, serverId, groupId, callId, success, null, null, commitInfos); - this.role = role; - this.roleElapsedTime = roleElapsedTime; + this.roleInfoProto = roleInfoProto; this.isRaftStorageHealthy = isRaftStorageHealthy; this.group = group; } @@ -56,13 +53,8 @@ public class ServerInformationReply extends RaftClientReply { return group; } - - public RaftProtos.RaftPeerRole getRole() { - return role; - } - - public long getRoleElapsedTime() { - return roleElapsedTime; + public RoleInfoProto getRoleInfoProto() { + return roleInfoProto; } public boolean isRaftStorageHealthy() { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c2423179/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index c34e5c4..2965f97 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -254,11 +254,40 @@ message ServerInformationRequestProto { RaftRpcRequestProto rpcRequest = 1; } +message ServerRpcDelayProto { + RaftPeerProto id = 1; + uint64 lastRpcElapsedTimeMs = 2; +} + +message LeaderInfoProto { + repeated ServerRpcDelayProto followerInfo = 1; +} + +message FollowerInfoProto { + ServerRpcDelayProto leaderInfo = 1; + bool inLogSync = 2; +} + +message CandidateInfoProto { + // nothing to add for candidate +} + +message RoleInfoProto { + RaftPeerProto self = 1; + RaftPeerRole role = 2; + uint64 roleElapsedTimeMs = 3; + + oneof PeerInfo { + LeaderInfoProto leaderInfo = 4; + FollowerInfoProto followerInfo = 5; + CandidateInfoProto candidateInfo = 6; + } +} + message ServerInformationReplyProto { RaftRpcReplyProto rpcReply = 1; RaftGroupProto group = 2; - RaftPeerRole role = 3; - uint64 roleElapsedTimeMs = 4; - bool isRaftStorageHealthy = 5; - repeated CommitInfoProto commitInfos = 6; + RoleInfoProto role = 3; + bool isRaftStorageHealthy = 4; + repeated CommitInfoProto commitInfos = 5; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c2423179/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java index 3fb5ecb..f526091 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java @@ -49,6 +49,10 @@ class FollowerState extends Daemon { return lastRpcTime; } + public boolean isInLogSync() { + return inLogSync; + } + boolean shouldWithholdVotes() { return lastRpcTime.elapsedTimeMs() < server.getMinTimeoutMs(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/c2423179/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 8d82f3a..2ef8125 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -44,6 +44,7 @@ import java.util.concurrent.*; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.ratis.server.impl.ServerProtoUtils.toRaftConfiguration; import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.*; @@ -371,9 +372,55 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou ServerInformationReply getServerInformation(ServerInformationRequest request) { final RaftGroup group = new RaftGroup(groupId, getRaftConf().getPeers()); - return new ServerInformationReply(request, role.getCurrentRole(), - role.getRoleElapsedTimeMs(), state.getStorage().getStorageDir().hasMetaFile(), - getCommitInfos(), group); + return new ServerInformationReply(request, getRoleInfoProto(), + state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), group); + } + + private RoleInfoProto getRoleInfoProto() { + RaftPeerRole currentRole = role.getCurrentRole(); + RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder() + .setSelf(ProtoUtils.toRaftPeerProto(getPeer())) + .setRole(currentRole) + .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()); + switch (currentRole) { + case CANDIDATE: + roleInfo.setCandidateInfo(CandidateInfoProto.getDefaultInstance()); + break; + + case FOLLOWER: + FollowerInfoProto.Builder follower = FollowerInfoProto.newBuilder() + .setLeaderInfo(getServerRpcDelayProto( + getRaftConf().getPeer(state.getLeaderId()), + heartbeatMonitor.getLastRpcTime().elapsedTimeMs())) + .setInLogSync(heartbeatMonitor.isInLogSync()); + roleInfo.setFollowerInfo(follower); + break; + + case LEADER: + LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder(); + Stream<LogAppender> stream = getLeaderState().getLogAppenders(); + stream.forEach(appender -> + leader.addFollowerInfo(getServerRpcDelayProto( + appender.getFollower().getPeer(), + appender.getFollower().getLastRpcResponseTime().elapsedTimeMs()))); + roleInfo.setLeaderInfo(leader); + break; + + default: + throw new IllegalStateException("incorrect role of server " + currentRole); + } + return roleInfo.build(); + } + + private ServerRpcDelayProto getServerRpcDelayProto (RaftPeer peer, long delay) { + if (peer == null) { + // if no peer information return empty + return ServerRpcDelayProto.getDefaultInstance(); + } + return ServerRpcDelayProto.newBuilder() + .setId(ProtoUtils.toRaftPeerProto(peer)) + .setLastRpcElapsedTimeMs(delay) + .build(); } synchronized void changeToCandidate() {
