Repository: incubator-ratis Updated Branches: refs/heads/master 0236eea30 -> ce88606a1
RATIS-285. Ratis State Machine should have an api to inform about loss of a follower/slow follower. Contributed by Mukul Kumar Singh. Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ce88606a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ce88606a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ce88606a Branch: refs/heads/master Commit: ce88606a1c94622255ccadb1fdd474e48793e712 Parents: 0236eea Author: Mukul Kumar Singh <[email protected]> Authored: Tue Aug 7 00:11:38 2018 +0530 Committer: Mukul Kumar Singh <[email protected]> Committed: Tue Aug 7 00:11:38 2018 +0530 ---------------------------------------------------------------------- .../ratis/grpc/server/GRpcLogAppender.java | 1 + ratis-proto-shaded/src/main/proto/Raft.proto | 6 +- .../ratis/server/RaftServerConfigKeys.java | 10 ++ .../apache/ratis/server/impl/FollowerInfo.java | 8 +- .../apache/ratis/server/impl/LogAppender.java | 7 ++ .../ratis/server/impl/RaftServerImpl.java | 16 +-- .../apache/ratis/statemachine/StateMachine.java | 12 +++ .../ratis/TestRaftServerSlownessDetection.java | 107 +++++++++++++++++++ .../SimpleStateMachine4Testing.java | 12 +++ 9 files changed, 168 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java index 7d144e9..595e061 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java @@ -92,6 +92,7 @@ public class GRpcLogAppender extends LogAppender { appendLog(); } } + checkSlowness(); } Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-proto-shaded/src/main/proto/Raft.proto ---------------------------------------------------------------------- diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto index 62202be..09fd2fd 100644 --- a/ratis-proto-shaded/src/main/proto/Raft.proto +++ b/ratis-proto-shaded/src/main/proto/Raft.proto @@ -255,17 +255,17 @@ message ServerInformationRequestProto { RaftRpcRequestProto rpcRequest = 1; } -message ServerRpcDelayProto { +message ServerRpcProto { RaftPeerProto id = 1; uint64 lastRpcElapsedTimeMs = 2; } message LeaderInfoProto { - repeated ServerRpcDelayProto followerInfo = 1; + repeated ServerRpcProto followerInfo = 1; } message FollowerInfoProto { - ServerRpcDelayProto leaderInfo = 1; + ServerRpcProto leaderInfo = 1; bool inLogSync = 2; } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index a18f9f1..9e7d83a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -223,6 +223,16 @@ public interface RaftServerConfigKeys { static void setSleepTime(RaftProperties properties, TimeDuration sleepTime) { setTimeDuration(properties::setTimeDuration, SLEEP_TIME_KEY, sleepTime); } + + String SLOWNESS_TIMEOUT_KEY = PREFIX + "slowness.timeout"; + TimeDuration SLOWNESS_TIMEOUT_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS); + static TimeDuration slownessTimeout(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(SLOWNESS_TIMEOUT_DEFAULT.getUnit()), + SLOWNESS_TIMEOUT_KEY, SLOWNESS_TIMEOUT_DEFAULT); + } + static void setSlownessTimeout(RaftProperties properties, TimeDuration expiryTime) { + setTimeDuration(properties::setTimeDuration, SLOWNESS_TIMEOUT_KEY, expiryTime); + } } /** server retry cache related */ http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java index 6bb8e5b..254319a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java @@ -32,15 +32,17 @@ public class FollowerInfo { private final AtomicLong matchIndex; private final AtomicLong commitIndex = new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX); private volatile boolean attendVote; + private final int rpcSlownessTimeoutMs; FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex, - boolean attendVote) { + boolean attendVote, int rpcSlownessTimeoutMs) { this.peer = peer; this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime); this.lastRpcSendTime = new AtomicReference<>(lastRpcTime); this.nextIndex = nextIndex; this.matchIndex = new AtomicLong(0); this.attendVote = attendVote; + this.rpcSlownessTimeoutMs = rpcSlownessTimeoutMs; } public void updateMatchIndex(final long matchIndex) { @@ -114,4 +116,8 @@ public class FollowerInfo { public Timestamp getLastRpcTime() { return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get()); } + + public boolean isSlow() { + return lastRpcResponseTime.get().elapsedTimeMs() > rpcSlownessTimeoutMs; + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java index b863bad..b0d47e2 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java @@ -448,6 +448,7 @@ public class LogAppender { } } } + checkSlowness(); } } @@ -496,6 +497,12 @@ public class LogAppender { leaderState.submitUpdateStateEvent(e); } + protected void checkSlowness() { + if (follower.isSlow()) { + server.getStateMachine().notifySlowness(server.getRaftConf(), server.getRoleInfoProto()); + } + } + public synchronized void notifyAppend() { this.notify(); } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 2ef8125..3879f4a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -65,6 +65,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou private final StateMachine stateMachine; private final int minTimeoutMs; private final int maxTimeoutMs; + private final int rpcSlownessTimeoutMs; private final LifeCycle lifeCycle; private final ServerState state; @@ -97,6 +98,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou final RaftProperties properties = proxy.getProperties(); minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS); maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS); + rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toInt(TimeUnit.MILLISECONDS); Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs, "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs); this.proxy = proxy; @@ -116,7 +118,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou LogAppender newLogAppender( LeaderState state, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) { - final FollowerInfo f = new FollowerInfo(peer, lastRpcTime, nextIndex, attendVote); + final FollowerInfo f = new FollowerInfo(peer, lastRpcTime, nextIndex, attendVote, rpcSlownessTimeoutMs); return getProxy().getFactory().newLogAppender(this, state, f); } @@ -376,7 +378,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), group); } - private RoleInfoProto getRoleInfoProto() { + public RoleInfoProto getRoleInfoProto() { RaftPeerRole currentRole = role.getCurrentRole(); RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder() .setSelf(ProtoUtils.toRaftPeerProto(getPeer())) @@ -389,7 +391,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou case FOLLOWER: FollowerInfoProto.Builder follower = FollowerInfoProto.newBuilder() - .setLeaderInfo(getServerRpcDelayProto( + .setLeaderInfo(getServerRpcProto( getRaftConf().getPeer(state.getLeaderId()), heartbeatMonitor.getLastRpcTime().elapsedTimeMs())) .setInLogSync(heartbeatMonitor.isInLogSync()); @@ -400,7 +402,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder(); Stream<LogAppender> stream = getLeaderState().getLogAppenders(); stream.forEach(appender -> - leader.addFollowerInfo(getServerRpcDelayProto( + leader.addFollowerInfo(getServerRpcProto( appender.getFollower().getPeer(), appender.getFollower().getLastRpcResponseTime().elapsedTimeMs()))); roleInfo.setLeaderInfo(leader); @@ -412,12 +414,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou return roleInfo.build(); } - private ServerRpcDelayProto getServerRpcDelayProto (RaftPeer peer, long delay) { + private ServerRpcProto getServerRpcProto(RaftPeer peer, long delay) { if (peer == null) { // if no peer information return empty - return ServerRpcDelayProto.getDefaultInstance(); + return ServerRpcProto.getDefaultInstance(); } - return ServerRpcDelayProto.newBuilder() + return ServerRpcProto.newBuilder() .setId(ProtoUtils.toRaftPeerProto(peer)) .setLastRpcElapsedTimeMs(delay) .build(); http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 4c2e64d..31ca468 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -25,6 +25,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.util.LifeCycle; import org.slf4j.Logger; @@ -207,4 +208,15 @@ public interface StateMachine extends Closeable { * Notify the state machine that the raft peer is no longer leader. */ void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws IOException; + + /** + * Notify the Leader's state machine that one of the followers is slow + * this notification is based on "raft.server.rpc.slowness.timeout" + * + * @param raftConfiguration raft configuration + * @param roleInfoProto information about the current node role and rpc delay information + */ + default void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) { + + } } http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java new file mode 100644 index 0000000..17c41a5 --- /dev/null +++ b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis; + +import org.apache.log4j.Level; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc; +import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.statemachine.SimpleStateMachine4Testing; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.LogUtils; +import org.apache.ratis.util.TimeDuration; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Test Raft Server Slowness detection and notification to Leader's statemachine. + */ +public class TestRaftServerSlownessDetection extends BaseTest { + static { + LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG); + LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG); + } + + public static final int NUM_SERVERS = 3; + + protected static final RaftProperties properties = new RaftProperties(); + + private final MiniRaftClusterWithSimulatedRpc cluster = MiniRaftClusterWithSimulatedRpc + .FACTORY.newCluster(NUM_SERVERS, getProperties()); + + public RaftProperties getProperties() { + RaftServerConfigKeys.Rpc + .setSlownessTimeout(properties, TimeDuration.valueOf(1, TimeUnit.SECONDS)); + properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + SimpleStateMachine4Testing.class, StateMachine.class); + return properties; + } + + @Before + public void setup() { + Assert.assertNull(cluster.getLeader()); + cluster.start(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testSlownessDetection() throws Exception { + RaftTestUtil.waitForLeader(cluster); + long slownessTimeout = RaftServerConfigKeys.Rpc + .slownessTimeout(cluster.getProperties()).toInt(TimeUnit.MILLISECONDS); + RaftServerImpl failedFollower = cluster.getFollowers().get(0); + + // fail the node and wait for the callback to be triggered + failedFollower.getProxy().close(); + Thread.sleep( slownessTimeout * 2); + + // Followers should not get any failed not notification + for (RaftServerImpl followerServer : cluster.getFollowers()) { + Assert.assertNull(SimpleStateMachine4Testing.get(followerServer).getSlownessInfo()); + } + // the leader should get notification that the follower has failed now + RaftProtos.RoleInfoProto roleInfoProto = + SimpleStateMachine4Testing.get(cluster.getLeader()).getSlownessInfo(); + Assert.assertNotNull(roleInfoProto); + + List<RaftProtos.ServerRpcProto> followers = + roleInfoProto.getLeaderInfo().getFollowerInfoList(); + //Assert that the node shutdown is lagging behind + for (RaftProtos.ServerRpcProto serverProto : followers) { + Assert.assertTrue(!(RaftPeerId.valueOf(serverProto.getId().getId()) == failedFollower.getId()) || + serverProto.getLastRpcElapsedTimeMs() > slownessTimeout); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java ---------------------------------------------------------------------- diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java index e39a9c8..9f6efae 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java @@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.StateMachineException; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.impl.RaftConfiguration; import org.apache.ratis.server.impl.RaftServerConstants; import org.apache.ratis.server.impl.RaftServerImpl; import org.apache.ratis.server.protocol.TermIndex; @@ -33,6 +34,7 @@ import org.apache.ratis.server.storage.LogInputStream; import org.apache.ratis.server.storage.LogOutputStream; import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -83,6 +85,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { private volatile boolean blockAppend = false; private final Semaphore blockingSemaphore = new Semaphore(1); private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX; + private RoleInfoProto slownessInfo = null; SimpleStateMachine4Testing() { checkpointer = new Daemon(() -> { @@ -99,6 +102,10 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { }); } + public RoleInfoProto getSlownessInfo() { + return slownessInfo; + } + @Override public synchronized void initialize(RaftServer server, RaftGroupId groupId, RaftStorage raftStorage) throws IOException { @@ -317,4 +324,9 @@ public class SimpleStateMachine4Testing extends BaseStateMachine { blockingSemaphore.release(); } } + + @Override + public void notifySlowness(RaftConfiguration raftConfiguration, RoleInfoProto roleInfoProto) { + slownessInfo = roleInfoProto; + } }
