Repository: incubator-ratis
Updated Branches:
  refs/heads/master 0236eea30 -> ce88606a1


RATIS-285. Ratis State Machine should have an api to inform about loss of a 
follower/slow follower. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/ce88606a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/ce88606a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/ce88606a

Branch: refs/heads/master
Commit: ce88606a1c94622255ccadb1fdd474e48793e712
Parents: 0236eea
Author: Mukul Kumar Singh <[email protected]>
Authored: Tue Aug 7 00:11:38 2018 +0530
Committer: Mukul Kumar Singh <[email protected]>
Committed: Tue Aug 7 00:11:38 2018 +0530

----------------------------------------------------------------------
 .../ratis/grpc/server/GRpcLogAppender.java      |   1 +
 ratis-proto-shaded/src/main/proto/Raft.proto    |   6 +-
 .../ratis/server/RaftServerConfigKeys.java      |  10 ++
 .../apache/ratis/server/impl/FollowerInfo.java  |   8 +-
 .../apache/ratis/server/impl/LogAppender.java   |   7 ++
 .../ratis/server/impl/RaftServerImpl.java       |  16 +--
 .../apache/ratis/statemachine/StateMachine.java |  12 +++
 .../ratis/TestRaftServerSlownessDetection.java  | 107 +++++++++++++++++++
 .../SimpleStateMachine4Testing.java             |  12 +++
 9 files changed, 168 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
index 7d144e9..595e061 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GRpcLogAppender.java
@@ -92,6 +92,7 @@ public class GRpcLogAppender extends LogAppender {
           appendLog();
         }
       }
+      checkSlowness();
     }
 
     
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto 
b/ratis-proto-shaded/src/main/proto/Raft.proto
index 62202be..09fd2fd 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -255,17 +255,17 @@ message ServerInformationRequestProto {
   RaftRpcRequestProto rpcRequest = 1;
 }
 
-message ServerRpcDelayProto {
+message ServerRpcProto {
   RaftPeerProto id = 1;
   uint64 lastRpcElapsedTimeMs = 2;
 }
 
 message LeaderInfoProto {
-  repeated ServerRpcDelayProto followerInfo = 1;
+  repeated ServerRpcProto followerInfo = 1;
 }
 
 message FollowerInfoProto {
-  ServerRpcDelayProto leaderInfo = 1;
+  ServerRpcProto leaderInfo = 1;
   bool inLogSync = 2;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index a18f9f1..9e7d83a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -223,6 +223,16 @@ public interface RaftServerConfigKeys {
     static void setSleepTime(RaftProperties properties, TimeDuration 
sleepTime) {
       setTimeDuration(properties::setTimeDuration, SLEEP_TIME_KEY, sleepTime);
     }
+
+    String SLOWNESS_TIMEOUT_KEY = PREFIX + "slowness.timeout";
+    TimeDuration SLOWNESS_TIMEOUT_DEFAULT = TimeDuration.valueOf(60, 
TimeUnit.SECONDS);
+    static TimeDuration slownessTimeout(RaftProperties properties) {
+      return 
getTimeDuration(properties.getTimeDuration(SLOWNESS_TIMEOUT_DEFAULT.getUnit()),
+          SLOWNESS_TIMEOUT_KEY, SLOWNESS_TIMEOUT_DEFAULT);
+    }
+    static void setSlownessTimeout(RaftProperties properties, TimeDuration 
expiryTime) {
+      setTimeDuration(properties::setTimeDuration, SLOWNESS_TIMEOUT_KEY, 
expiryTime);
+    }
   }
 
   /** server retry cache related */

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
index 6bb8e5b..254319a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
@@ -32,15 +32,17 @@ public class FollowerInfo {
   private final AtomicLong matchIndex;
   private final AtomicLong commitIndex = new 
AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
   private volatile boolean attendVote;
+  private final int rpcSlownessTimeoutMs;
 
   FollowerInfo(RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
-      boolean attendVote) {
+      boolean attendVote, int rpcSlownessTimeoutMs) {
     this.peer = peer;
     this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
     this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
     this.nextIndex = nextIndex;
     this.matchIndex = new AtomicLong(0);
     this.attendVote = attendVote;
+    this.rpcSlownessTimeoutMs = rpcSlownessTimeoutMs;
   }
 
   public void updateMatchIndex(final long matchIndex) {
@@ -114,4 +116,8 @@ public class FollowerInfo {
   public Timestamp getLastRpcTime() {
     return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
   }
+
+  public boolean isSlow() {
+    return lastRpcResponseTime.get().elapsedTimeMs() > rpcSlownessTimeoutMs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index b863bad..b0d47e2 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -448,6 +448,7 @@ public class LogAppender {
           }
         }
       }
+      checkSlowness();
     }
   }
 
@@ -496,6 +497,12 @@ public class LogAppender {
     leaderState.submitUpdateStateEvent(e);
   }
 
+  protected void checkSlowness() {
+    if (follower.isSlow()) {
+      server.getStateMachine().notifySlowness(server.getRaftConf(), 
server.getRoleInfoProto());
+    }
+  }
+
   public synchronized void notifyAppend() {
     this.notify();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 2ef8125..3879f4a 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -65,6 +65,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   private final StateMachine stateMachine;
   private final int minTimeoutMs;
   private final int maxTimeoutMs;
+  private final int rpcSlownessTimeoutMs;
 
   private final LifeCycle lifeCycle;
   private final ServerState state;
@@ -97,6 +98,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
     final RaftProperties properties = proxy.getProperties();
     minTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMin(properties).toInt(TimeUnit.MILLISECONDS);
     maxTimeoutMs = 
RaftServerConfigKeys.Rpc.timeoutMax(properties).toInt(TimeUnit.MILLISECONDS);
+    rpcSlownessTimeoutMs = 
RaftServerConfigKeys.Rpc.slownessTimeout(properties).toInt(TimeUnit.MILLISECONDS);
     Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
         "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
     this.proxy = proxy;
@@ -116,7 +118,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
   LogAppender newLogAppender(
       LeaderState state, RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
       boolean attendVote) {
-    final FollowerInfo f = new FollowerInfo(peer, lastRpcTime, nextIndex, 
attendVote);
+    final FollowerInfo f = new FollowerInfo(peer, lastRpcTime, nextIndex, 
attendVote, rpcSlownessTimeoutMs);
     return getProxy().getFactory().newLogAppender(this, state, f);
   }
 
@@ -376,7 +378,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
         state.getStorage().getStorageDir().hasMetaFile(), getCommitInfos(), 
group);
   }
 
-  private RoleInfoProto getRoleInfoProto() {
+  public RoleInfoProto getRoleInfoProto() {
     RaftPeerRole currentRole = role.getCurrentRole();
     RoleInfoProto.Builder roleInfo = RoleInfoProto.newBuilder()
         .setSelf(ProtoUtils.toRaftPeerProto(getPeer()))
@@ -389,7 +391,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
 
     case FOLLOWER:
       FollowerInfoProto.Builder follower = FollowerInfoProto.newBuilder()
-          .setLeaderInfo(getServerRpcDelayProto(
+          .setLeaderInfo(getServerRpcProto(
               getRaftConf().getPeer(state.getLeaderId()),
               heartbeatMonitor.getLastRpcTime().elapsedTimeMs()))
           .setInLogSync(heartbeatMonitor.isInLogSync());
@@ -400,7 +402,7 @@ public class RaftServerImpl implements RaftServerProtocol, 
RaftServerAsynchronou
       LeaderInfoProto.Builder leader = LeaderInfoProto.newBuilder();
       Stream<LogAppender> stream = getLeaderState().getLogAppenders();
       stream.forEach(appender ->
-          leader.addFollowerInfo(getServerRpcDelayProto(
+          leader.addFollowerInfo(getServerRpcProto(
               appender.getFollower().getPeer(),
               
appender.getFollower().getLastRpcResponseTime().elapsedTimeMs())));
       roleInfo.setLeaderInfo(leader);
@@ -412,12 +414,12 @@ public class RaftServerImpl implements 
RaftServerProtocol, RaftServerAsynchronou
     return roleInfo.build();
   }
 
-  private ServerRpcDelayProto getServerRpcDelayProto (RaftPeer peer, long 
delay) {
+  private ServerRpcProto getServerRpcProto(RaftPeer peer, long delay) {
     if (peer == null) {
       // if no peer information return empty
-      return ServerRpcDelayProto.getDefaultInstance();
+      return ServerRpcProto.getDefaultInstance();
     }
-    return ServerRpcDelayProto.newBuilder()
+    return ServerRpcProto.newBuilder()
         .setId(ProtoUtils.toRaftPeerProto(peer))
         .setLastRpcElapsedTimeMs(delay)
         .build();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java 
b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index 4c2e64d..31ca468 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -25,6 +25,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.util.LifeCycle;
 import org.slf4j.Logger;
@@ -207,4 +208,15 @@ public interface StateMachine extends Closeable {
    * Notify the state machine that the raft peer is no longer leader.
    */
   void notifyNotLeader(Collection<TransactionContext> pendingEntries) throws 
IOException;
+
+  /**
+   * Notify the Leader's state machine that one of the followers is slow
+   * this notification is based on "raft.server.rpc.slowness.timeout"
+   *
+   * @param raftConfiguration raft configuration
+   * @param roleInfoProto information about the current node role and rpc 
delay information
+   */
+  default void notifySlowness(RaftConfiguration raftConfiguration, 
RoleInfoProto roleInfoProto) {
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
new file mode 100644
index 0000000..17c41a5
--- /dev/null
+++ 
b/ratis-server/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test Raft Server Slowness detection and notification to Leader's 
statemachine.
+ */
+public class TestRaftServerSlownessDetection extends BaseTest {
+  static {
+    LogUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
+  }
+
+  public static final int NUM_SERVERS = 3;
+
+  protected static final RaftProperties properties = new RaftProperties();
+
+  private final MiniRaftClusterWithSimulatedRpc cluster = 
MiniRaftClusterWithSimulatedRpc
+      .FACTORY.newCluster(NUM_SERVERS, getProperties());
+
+  public RaftProperties getProperties() {
+    RaftServerConfigKeys.Rpc
+        .setSlownessTimeout(properties, TimeDuration.valueOf(1, 
TimeUnit.SECONDS));
+    properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        SimpleStateMachine4Testing.class, StateMachine.class);
+    return properties;
+  }
+
+  @Before
+  public void setup() {
+    Assert.assertNull(cluster.getLeader());
+    cluster.start();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testSlownessDetection() throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    long slownessTimeout = RaftServerConfigKeys.Rpc
+        .slownessTimeout(cluster.getProperties()).toInt(TimeUnit.MILLISECONDS);
+    RaftServerImpl failedFollower = cluster.getFollowers().get(0);
+
+    // fail the node and wait for the callback to be triggered
+    failedFollower.getProxy().close();
+    Thread.sleep( slownessTimeout * 2);
+
+    // Followers should not get any failed not notification
+    for (RaftServerImpl followerServer : cluster.getFollowers()) {
+      
Assert.assertNull(SimpleStateMachine4Testing.get(followerServer).getSlownessInfo());
+    }
+    // the leader should get notification that the follower has failed now
+    RaftProtos.RoleInfoProto roleInfoProto =
+        SimpleStateMachine4Testing.get(cluster.getLeader()).getSlownessInfo();
+    Assert.assertNotNull(roleInfoProto);
+
+    List<RaftProtos.ServerRpcProto> followers =
+        roleInfoProto.getLeaderInfo().getFollowerInfoList();
+    //Assert that the node shutdown is lagging behind
+    for (RaftProtos.ServerRpcProto serverProto : followers) {
+      Assert.assertTrue(!(RaftPeerId.valueOf(serverProto.getId().getId()) == 
failedFollower.getId()) ||
+          serverProto.getLastRpcElapsedTimeMs() > slownessTimeout);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/ce88606a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index e39a9c8..9f6efae 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -33,6 +34,7 @@ import org.apache.ratis.server.storage.LogInputStream;
 import org.apache.ratis.server.storage.LogOutputStream;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
@@ -83,6 +85,7 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
   private volatile boolean blockAppend = false;
   private final Semaphore blockingSemaphore = new Semaphore(1);
   private long endIndexLastCkpt = RaftServerConstants.INVALID_LOG_INDEX;
+  private RoleInfoProto slownessInfo = null;
 
   SimpleStateMachine4Testing() {
     checkpointer = new Daemon(() -> {
@@ -99,6 +102,10 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
     });
   }
 
+  public RoleInfoProto getSlownessInfo() {
+    return slownessInfo;
+  }
+
   @Override
   public synchronized void initialize(RaftServer server, RaftGroupId groupId,
       RaftStorage raftStorage) throws IOException {
@@ -317,4 +324,9 @@ public class SimpleStateMachine4Testing extends 
BaseStateMachine {
       blockingSemaphore.release();
     }
   }
+
+  @Override
+  public void notifySlowness(RaftConfiguration raftConfiguration, 
RoleInfoProto roleInfoProto) {
+    slownessInfo = roleInfoProto;
+  }
 }

Reply via email to