Repository: incubator-ratis Updated Branches: refs/heads/master 4d00b24dd -> a3a833290
RATIS-269. Enhance server information reply to add raft node role, raft storage directory status information. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/a3a83329 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/a3a83329 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/a3a83329 Branch: refs/heads/master Commit: a3a833290b15d0f74a27c03bf87a14bb380fb260 Parents: 4d00b24 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Wed Jul 25 13:17:38 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Wed Jul 25 13:17:38 2018 -0700 ---------------------------------------------------------------------- .../ratis/client/impl/ClientProtoUtils.java | 5 +- .../ratis/protocol/ServerInformationReply.java | 29 ++++++++- .../arithmetic/ArithmeticStateMachine.java | 5 +- ratis-proto-shaded/src/main/proto/Raft.proto | 13 +++- .../org/apache/ratis/server/RaftServer.java | 4 -- .../ratis/server/impl/RaftServerImpl.java | 40 +++++++------ .../org/apache/ratis/server/impl/RoleInfo.java | 62 ++++++++++++++++++++ .../server/storage/RaftStorageDirectory.java | 2 +- .../ratis/statemachine/TransactionContext.java | 4 +- .../impl/TransactionContextImpl.java | 12 ++-- 10 files changed, 138 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java ---------------------------------------------------------------------- diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java index f32d6b9..33351c6 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java @@ -230,8 +230,11 @@ public interface ClientProtoUtils { ClientId clientId = ClientId.valueOf(rp.getRequestorId()); final RaftGroupId groupId = ProtoUtils.toRaftGroupId(rp.getRaftGroupId()); final RaftGroup raftGroup = ProtoUtils.toRaftGroup(replyProto.getGroup()); + RaftPeerRole role = replyProto.getRole(); + boolean isRaftStorageHealthy = replyProto.getIsRaftStorageHealthy(); + long roleElapsedTime = replyProto.getRoleElapsedTimeMs(); return new ServerInformationReply(clientId, RaftPeerId.valueOf(rp.getReplyId()), - groupId, rp.getCallId(), rp.getSuccess(), + groupId, rp.getCallId(), rp.getSuccess(), role, roleElapsedTime, isRaftStorageHealthy, replyProto.getCommitInfosList(), raftGroup); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java ---------------------------------------------------------------------- diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java index d06b251..9c4eaa8 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformationReply.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.protocol; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto; import java.util.Collection; @@ -26,21 +27,45 @@ import java.util.Collection; */ public class ServerInformationReply extends RaftClientReply { private final RaftGroup group; + private final RaftProtos.RaftPeerRole role; + private final long roleElapsedTime; + private final boolean isRaftStorageHealthy; public ServerInformationReply( - RaftClientRequest request, Collection<CommitInfoProto> commitInfos, RaftGroup group) { + RaftClientRequest request, RaftProtos.RaftPeerRole role, long roleElapsedTime, + boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { super(request, commitInfos); + this.role = role; + this.roleElapsedTime = roleElapsedTime; + this.isRaftStorageHealthy = isRaftStorageHealthy; this.group = group; } public ServerInformationReply( ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, - long callId, boolean success, Collection<CommitInfoProto> commitInfos, RaftGroup group) { + long callId, boolean success, RaftProtos.RaftPeerRole role, long roleElapsedTime, + boolean isRaftStorageHealthy, Collection<CommitInfoProto> commitInfos, RaftGroup group) { super(clientId, serverId, groupId, callId, success, null, null, commitInfos); + this.role = role; + this.roleElapsedTime = roleElapsedTime; + this.isRaftStorageHealthy = isRaftStorageHealthy; this.group = group; } public RaftGroup getGroup() { return group; } + + + public RaftProtos.RaftPeerRole getRole() { + return role; + } + + public long getRoleElapsedTime() { + return roleElapsedTime; + } + + public boolean isRaftStorageHealthy() { + return isRaftStorageHealthy; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java index e35fb9c..9b5c518 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java @@ -24,6 +24,7 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; @@ -162,8 +163,8 @@ public class ArithmeticStateMachine extends BaseStateMachine { final Expression r = Expression.Utils.double2Expression(result); final CompletableFuture<Message> f = CompletableFuture.completedFuture(Expression.Utils.toMessage(r)); - final RaftServer.Role role = trx.getServerRole(); - if (role == RaftServer.Role.LEADER) { + final RaftProtos.RaftPeerRole role = trx.getServerRole(); + if (role == RaftProtos.RaftPeerRole.LEADER) { LOG.info("{}:{}-{}: {} = {}", role, getId(), index, assignment, r); } else { LOG.debug("{}:{}-{}: {} = {}", role, getId(), index, assignment, r); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 303ec2b..eb8b0b5 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -175,6 +175,14 @@ enum ReplicationLevel { ALL = 1; } + +/** Role of raft peer */ +enum RaftPeerRole { + LEADER = 0; + CANDIDATE = 1; + FOLLOWER = 2; +} + message WriteRequestTypeProto { ReplicationLevel replication = 1; } @@ -248,5 +256,8 @@ message ServerInformationRequestProto { message ServerInformationReplyProto { RaftRpcReplyProto rpcReply = 1; RaftGroupProto group = 2; - repeated CommitInfoProto commitInfos = 3; + RaftPeerRole role = 3; + uint64 roleElapsedTimeMs = 4; + bool isRaftStorageHealthy = 5; + repeated CommitInfoProto commitInfos = 6; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java index 8ad7873..daae997 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java @@ -36,10 +36,6 @@ public interface RaftServer extends Closeable, RpcType.Get, RaftServerProtocol, RaftServerAsynchronousProtocol, RaftClientProtocol, RaftClientAsynchronousProtocol, AdminProtocol, AdminAsynchronousProtocol { - /** The role of a raft server. */ - enum Role { - LEADER, CANDIDATE, FOLLOWER - } /** @return the server ID. */ RaftPeerId getId(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index bbfbb02..8d82f3a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -22,7 +22,6 @@ import org.apache.ratis.protocol.*; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerMXBean; import org.apache.ratis.server.RaftServerRpc; -import org.apache.ratis.server.RaftServer.Role; import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol; import org.apache.ratis.server.protocol.RaftServerProtocol; import org.apache.ratis.server.protocol.TermIndex; @@ -70,7 +69,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou private final ServerState state; private final RaftGroupId groupId; private final Supplier<RaftPeer> peerSupplier = JavaUtils.memoize(() -> new RaftPeer(getId(), getServerRpc().getInetSocketAddress())); - private volatile Role role; + private final RoleInfo role; /** used when the peer is follower, to monitor election timeout */ private volatile FollowerState heartbeatMonitor; @@ -92,6 +91,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou this.groupId = group.getGroupId(); this.lifeCycle = new LifeCycle(id); this.stateMachine = stateMachine; + this.role = new RoleInfo(); final RaftProperties properties = proxy.getProperties(); minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS); @@ -157,10 +157,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return proxy.getServerRpc(); } - private void setRole(Role newRole, String op) { + private void setRole(RaftPeerRole newRole, String op) { LOG.info("{} changes role from {} to {} at term {} for {}", getId(), this.role, newRole, state.getCurrentTerm(), op); - this.role = newRole; + this.role.transitionRole(newRole); } void start() { @@ -191,7 +191,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou * The peer belongs to the current configuration, should start as a follower */ private void startAsFollower() { - setRole(Role.FOLLOWER, "startAsFollower"); + setRole(RaftPeerRole.FOLLOWER, "startAsFollower"); startHeartbeatMonitor(); lifeCycle.transition(RUNNING); } @@ -202,7 +202,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou * start election. */ private void startInitializing() { - setRole(Role.FOLLOWER, "startInitializing"); + setRole(RaftPeerRole.FOLLOWER, "startInitializing"); // do not start heartbeatMonitoring } @@ -218,8 +218,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return getState().getSelfId(); } - Role getRole() { - return role; + RaftPeerRole getRole() { + return role.getCurrentRole(); } RaftConfiguration getRaftConf() { @@ -261,15 +261,15 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } public boolean isFollower() { - return role == Role.FOLLOWER; + return role.isFollower(); } public boolean isCandidate() { - return role == Role.CANDIDATE; + return role.isCandidate(); } public boolean isLeader() { - return role == Role.LEADER; + return role.isLeader(); } /** @@ -282,14 +282,14 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou */ synchronized boolean changeToFollower(long newTerm, boolean sync) throws IOException { - final Role old = role; + final RaftPeerRole old = role.getCurrentRole(); final boolean metadataUpdated = state.updateCurrentTerm(newTerm); - if (old != Role.FOLLOWER) { - setRole(Role.FOLLOWER, "changeToFollower"); - if (old == Role.LEADER) { + if (old != RaftPeerRole.FOLLOWER) { + setRole(RaftPeerRole.FOLLOWER, "changeToFollower"); + if (old == RaftPeerRole.LEADER) { shutdownLeaderState(false); - } else if (old == Role.CANDIDATE) { + } else if (old == RaftPeerRole.CANDIDATE) { shutdownElectionDaemon(); } startHeartbeatMonitor(); @@ -325,7 +325,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou synchronized void changeToLeader() { Preconditions.assertTrue(isCandidate()); shutdownElectionDaemon(); - setRole(Role.LEADER, "changeToLeader"); + setRole(RaftPeerRole.LEADER, "changeToLeader"); state.becomeLeader(); // start sending AppendEntries RPC to followers @@ -371,13 +371,15 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou ServerInformationReply getServerInformation(ServerInformationRequest request) { final RaftGroup group = new RaftGroup(groupId, getRaftConf().getPeers()); - return new ServerInformationReply(request, getCommitInfos(), group); + return new ServerInformationReply(request, role.getCurrentRole(), + role.getRoleElapsedTimeMs(), state.getStorage().getStorageDir().hasMetaFile(), + getCommitInfos(), group); } synchronized void changeToCandidate() { Preconditions.assertTrue(isFollower()); shutdownHeartbeatMonitor(); - setRole(Role.CANDIDATE, "changeToCandidate"); + setRole(RaftPeerRole.CANDIDATE, "changeToCandidate"); // start election electionDaemon = new LeaderElection(this); electionDaemon.start(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java new file mode 100644 index 0000000..e7a6e80 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ratis.server.impl; + +import org.apache.ratis.shaded.proto.RaftProtos.RaftPeerRole; +import org.apache.ratis.util.Timestamp; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Maintain the Role of a Raft Peer. + */ +public class RoleInfo { + + private volatile RaftPeerRole role; + private final AtomicReference<Timestamp> transitionTime; + + RoleInfo() { + this.transitionTime = new AtomicReference<>(new Timestamp()); + } + + public void transitionRole(RaftPeerRole newRole) { + this.role = newRole; + this.transitionTime.set(new Timestamp()); + } + + public long getRoleElapsedTimeMs() { + return transitionTime.get().elapsedTimeMs(); + } + + public RaftPeerRole getCurrentRole() { + return role; + } + + public boolean isFollower() { + return role == RaftPeerRole.FOLLOWER; + } + + public boolean isCandidate() { + return role == RaftPeerRole.CANDIDATE; + } + + public boolean isLeader() { + return role == RaftPeerRole.LEADER; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java index 24d78b7..bcb5823 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftStorageDirectory.java @@ -261,7 +261,7 @@ public class RaftStorageDirectory { } } - boolean hasMetaFile() throws IOException { + public boolean hasMetaFile() { return getMetaFile().exists(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java index a417912..be92532 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/TransactionContext.java @@ -18,7 +18,7 @@ package org.apache.ratis.statemachine; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.server.RaftServer.Role; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; @@ -46,7 +46,7 @@ import java.util.Collection; */ public interface TransactionContext { /** @return the role of the server when this context is created. */ - Role getServerRole(); + RaftProtos.RaftPeerRole getServerRole(); /** * Returns the original request from the {@link RaftClientRequest} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/a3a83329/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index 08d3536..b49a443 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -20,7 +20,7 @@ package org.apache.ratis.statemachine.impl; import java.io.IOException; import java.util.Objects; import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.server.RaftServer.Role; +import org.apache.ratis.shaded.proto.RaftProtos; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; @@ -33,7 +33,7 @@ import org.apache.ratis.util.Preconditions; */ public class TransactionContextImpl implements TransactionContext { /** The role of the server when this object is created. */ - private final Role serverRole; + private final RaftProtos.RaftPeerRole serverRole; /** The {@link StateMachine} that originated the transaction. */ private final StateMachine stateMachine; @@ -64,7 +64,7 @@ public class TransactionContextImpl implements TransactionContext { /** Committed LogEntry. */ private LogEntryProto logEntry; - private TransactionContextImpl(Role serverRole, StateMachine stateMachine) { + private TransactionContextImpl(RaftProtos.RaftPeerRole serverRole, StateMachine stateMachine) { this.serverRole = serverRole; this.stateMachine = stateMachine; } @@ -85,7 +85,7 @@ public class TransactionContextImpl implements TransactionContext { public TransactionContextImpl( StateMachine stateMachine, RaftClientRequest clientRequest, SMLogEntryProto smLogEntryProto, Object stateMachineContext) { - this(Role.LEADER, stateMachine); + this(RaftProtos.RaftPeerRole.LEADER, stateMachine); this.clientRequest = clientRequest; this.smLogEntryProto = smLogEntryProto; this.stateMachineContext = stateMachineContext; @@ -96,14 +96,14 @@ public class TransactionContextImpl implements TransactionContext { * Used by followers for applying committed entries to the state machine. * @param logEntry the log entry to be applied */ - public TransactionContextImpl(Role serverRole, StateMachine stateMachine, LogEntryProto logEntry) { + public TransactionContextImpl(RaftProtos.RaftPeerRole serverRole, StateMachine stateMachine, LogEntryProto logEntry) { this(serverRole, stateMachine); setLogEntry(logEntry); this.smLogEntryProto = logEntry.getSmLogEntry(); } @Override - public Role getServerRole() { + public RaftProtos.RaftPeerRole getServerRole() { return serverRole; }
