Repository: incubator-ratis Updated Branches: refs/heads/master 86db875aa -> 67db67efd
RATIS-290. Raft server should notify the state machine if no leader is assigned for a long time. Contributed by Mukul Kumar Singh Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/67db67ef Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/67db67ef Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/67db67ef Branch: refs/heads/master Commit: 67db67efd4a32b1ec3054571c0a8d885f4db882b Parents: 86db875 Author: Tsz Wo Nicholas Sze <[email protected]> Authored: Mon Aug 13 15:08:04 2018 -0700 Committer: Tsz Wo Nicholas Sze <[email protected]> Committed: Mon Aug 13 15:08:04 2018 -0700 ---------------------------------------------------------------------- ratis-proto-shaded/src/main/proto/Raft.proto | 2 +- .../ratis/server/RaftServerConfigKeys.java | 15 +++ .../apache/ratis/server/impl/LogAppender.java | 2 +- .../ratis/server/impl/RaftServerImpl.java | 14 ++- .../apache/ratis/server/impl/ServerState.java | 33 ++++++- .../apache/ratis/statemachine/StateMachine.java | 16 +++- .../TestRaftServerLeaderElectionTimeout.java | 98 ++++++++++++++++++++ .../ratis/TestRaftServerSlownessDetection.java | 2 +- .../SimpleStateMachine4Testing.java | 14 ++- 9 files changed, 184 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 09fd2fd..039a3f6 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -270,7 +270,7 @@ message FollowerInfoProto { } message CandidateInfoProto { - // nothing to add for candidate + uint64 lastLeaderElapsedTimeMs = 1; } message RoleInfoProto { http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 9e7d83a..4d5abf5 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -54,6 +54,21 @@ public interface RaftServerConfigKeys { setInt(properties::setInt, STAGING_CATCHUP_GAP_KEY, stagingCatchupGap); } + /** + * Timeout for leader election after which the statemachine of the server is notified + * about leader election pending for a long time. + */ + String LEADER_ELECTION_TIMEOUT_KEY = PREFIX + ".leader.election.timeout"; + TimeDuration LEADER_ELECTION_TIMEOUT_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS); + static TimeDuration leaderElectionTimeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(LEADER_ELECTION_TIMEOUT_DEFAULT.getUnit()), + LEADER_ELECTION_TIMEOUT_KEY, LEADER_ELECTION_TIMEOUT_DEFAULT); + } + static void setLeaderElectionTimeout(RaftProperties properties, TimeDuration leaderElectionTimeout) { + setTimeDuration(properties::setTimeDuration, LEADER_ELECTION_TIMEOUT_KEY, leaderElectionTimeout); + + } + interface Log { String PREFIX = RaftServerConfigKeys.PREFIX + ".log"; http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index b0d47e2..58f5525 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -499,7 +499,7 @@ public class LogAppender { protected void checkSlowness() { if (follower.isSlow()) { - server.getStateMachine().notifySlowness(server.getRaftConf(), server.getRoleInfoProto()); + server.getStateMachine().notifySlowness(server.getGroup(), server.getRoleInfoProto()); } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index f2114b9..291c5fe 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -229,6 +229,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return getState().getRaftConf(); } + RaftGroup getGroup() { + return new RaftGroup(groupId, getRaftConf().getPeers()); + } + void shutdown() { lifeCycle.checkStateAndClose(() -> { try { @@ -373,9 +377,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou } ServerInformationReply getServerInformation(ServerInformationRequest request) { - final RaftGroup group = new RaftGroup(groupId, getRaftConf().getPeers()); return new ServerInformationReply(request, getRoleInfoProto(), - state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), group); + state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), getGroup()); } public RoleInfoProto getRoleInfoProto() { @@ -386,7 +389,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou .setRoleElapsedTimeMs(role.getRoleElapsedTimeMs()); switch (currentRole) { case CANDIDATE: - roleInfo.setCandidateInfo(CandidateInfoProto.getDefaultInstance()); + CandidateInfoProto.Builder candidate = CandidateInfoProto.newBuilder() + .setLastLeaderElapsedTimeMs(state.getLastLeaderElapsedTimeMs()); + roleInfo.setCandidateInfo(candidate); break; case FOLLOWER: @@ -429,6 +434,9 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou Preconditions.assertTrue(isFollower()); shutdownHeartbeatMonitor(); setRole(RaftPeerRole.CANDIDATE, "changeToCandidate"); + if (state.checkForExtendedNoLeader()) { + stateMachine.notifyExtendedNoLeader(getGroup(), getRoleInfoProto()); + } // start election electionDaemon = new LeaderElection(this); electionDaemon.start(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index eb67fac..c5a6a98 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -28,11 +28,14 @@ import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.ProtoUtils; +import org.apache.ratis.util.Timestamp; import java.io.Closeable; import java.io.File; import java.io.IOException; import java.util.Objects; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import static org.apache.ratis.server.impl.RaftServerImpl.LOG; @@ -53,6 +56,8 @@ public class ServerState implements Closeable { /** local storage for log and snapshot */ private final RaftStorage storage; private final SnapshotManager snapshotManager; + private volatile Timestamp lastNoLeaderTime; + private final long leaderElectionTimeoutMs; /** * Latest term server has seen. initialized to 0 on first boot, increases @@ -92,7 +97,12 @@ public class ServerState implements Closeable { long lastApplied = initStatemachine(stateMachine, group.getGroupId()); + // On start the leader is null, start the clock now leaderId = null; + this.lastNoLeaderTime = new Timestamp(); + this.leaderElectionTimeoutMs = + RaftServerConfigKeys.leaderElectionTimeout(prop).toInt(TimeUnit.MILLISECONDS); + // we cannot apply log entries to the state machine in this step, since we // do not know whether the local log entries have been committed. log = initLog(id, prop, lastApplied, entry -> { @@ -207,12 +217,31 @@ public class ServerState implements Closeable { void setLeader(RaftPeerId newLeaderId, String op) { if (!Objects.equals(leaderId, newLeaderId)) { - LOG.info("{}: change Leader from {} to {} at term {} for {}", - selfId, leaderId, newLeaderId, getCurrentTerm(), op); + String suffix; + if (newLeaderId == null) { + // reset the time stamp when a null leader is assigned + lastNoLeaderTime = new Timestamp(); + suffix = ""; + } else { + Timestamp previous = lastNoLeaderTime; + lastNoLeaderTime = null; + suffix = ", leader elected after " + previous.elapsedTimeMs() + "ms"; + } + LOG.info("{}: change Leader from {} to {} at term {} for {}{}", + selfId, leaderId, newLeaderId, getCurrentTerm(), op, suffix); leaderId = newLeaderId; } } + boolean checkForExtendedNoLeader() { + return getLastLeaderElapsedTimeMs() > leaderElectionTimeoutMs; + } + + long getLastLeaderElapsedTimeMs() { + final Timestamp t = lastNoLeaderTime; + return t == null ? 0 : t.elapsedTimeMs(); + } + void becomeLeader() { setLeader(selfId, "becomeLeader"); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 31ca468..88b5276 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -19,6 +19,7 @@ package org.apache.ratis.statemachine; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; @@ -213,10 +214,21 @@ public interface StateMachine extends Closeable { * Notify the Leader's state machine that one of the followers is slow * this notification is based on "raft.server.rpc.slowness.timeout" * - * @param raftConfiguration raft configuration + * @param group raft group information * @param roleInfoProto information about the current node role and rpc delay information */ - default void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) { + default void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) { + + } + + /** + * Notify the Leader's state machine that a leader has not been elected for a long time + * this notification is based on "raft.server.leader.election.timeout" + * + * @param group raft group information + * @param roleInfoProto information about the current node role and rpc delay information + */ + default void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java new file mode 100644 index 0000000..67156a1 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerLeaderElectionTimeout.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; +import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +/** + * Test Raft Server Leader election timeout detection and notification to state machine. + */ +public class TestRaftServerLeaderElectionTimeout extends BaseTest { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + public static final int NUM_SERVERS = 3; + + protected static final RaftProperties properties = new RaftProperties(); + + private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc + .FACTORY.newCluster(NUM_SERVERS, getProperties()); + + public RaftProperties getProperties() { + RaftServerConfigKeys + .setLeaderElectionTimeout(properties, TimeDuration.valueOf(1, TimeUnit.SECONDS)); + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + return properties; + } + + @Before + public void setup() { + Assert.assertNull(cluster.getLeader()); + cluster.start(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testLeaderElectionDetection() throws Exception { + RaftTestUtil.waitForLeader(cluster); + long leaderElectionTimeout = RaftServerConfigKeys. + leaderElectionTimeout(cluster.getProperties()).toInt(TimeUnit.MILLISECONDS); + + RaftServerImpl healthyFollower = cluster.getFollowers().get(1); + RaftServerImpl failedFollower = cluster.getFollowers().get(0); + // fail the leader and one of the followers to that quorum is not present + // for next leader election to succeed. + cluster.killServer(failedFollower.getId()); + cluster.killServer(cluster.getLeader().getId()); + + // Wait to ensure that leader election is trigerred and also state machine callback is triggered + Thread.sleep( leaderElectionTimeout * 2); + + RaftProtos.RoleInfoProto roleInfoProto = + SimpleStateMachine4Testing.get(healthyFollower).getLeaderElectionTimeoutInfo(); + Assert.assertNotNull(roleInfoProto); + + Assert.assertEquals(roleInfoProto.getRole(), RaftProtos.RaftPeerRole.CANDIDATE); + Assert.assertTrue(roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() > leaderElectionTimeout); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java index 17c41a5..811e7de 100644 --- a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java @@ -84,7 +84,7 @@ public class TestRaftServerSlownessDetection extends BaseTest { RaftServerImpl failedFollower = cluster.getFollowers().get(0); // fail the node and wait for the callback to be triggered - failedFollower.getProxy().close(); + cluster.killServer(failedFollower.getId()); Thread.sleep( slownessTimeout * 2); // Followers should not get any failed not notification http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/67db67ef/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index 9f6efae..5554da2 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -26,7 +26,7 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.server.impl.RaftConfiguration; +import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.protocol.TermIndex; @@ -86,6 +86,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { private final Semaphore blockingSemaphore = new Semaphore(1); private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; private RoleInfoProto slownessInfo = null; + private RoleInfoProto leaderElectionTimeoutInfo = null; SimpleStateMachine4Testing() { checkpointer = new Daemon(() -> { @@ -106,6 +107,10 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { return slownessInfo; } + public RoleInfoProto getLeaderElectionTimeoutInfo() { + return leaderElectionTimeoutInfo; + } + @Override public synchronized void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException { @@ -326,7 +331,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { } @Override - public void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) { + public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) { slownessInfo = roleInfoProto; } + + @Override + public void notifyExtendedNoLeader(RaftGroup group, RoleInfoProto roleInfoProto) { + leaderElectionTimeoutInfo = roleInfoProto; + } }
