This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 837b063  RATIS-1411.  Alleviate slow follower issue (#508)
837b063 is described below

commit 837b063a4e11edd241759c7b967af44120b25f60
Author: Sammi Chen <[email protected]>
AuthorDate: Thu Sep 30 18:34:56 2021 +0800

    RATIS-1411.  Alleviate slow follower issue (#508)
---
 .../main/java/org/apache/ratis/conf/ConfUtils.java | 23 ++++++++++++++++++
 .../apache/ratis/server/RaftServerConfigKeys.java  | 12 ++++++++++
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 28 ++++++++++++++++++++--
 3 files changed, 61 insertions(+), 2 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java 
b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index 989070f..1d7d9c5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -68,6 +68,15 @@ public interface ConfUtils {
     };
   }
 
+  static BiConsumer<String, Double> requireMax(double max) {
+    return (key, value) -> {
+      if (value > max) {
+        throw new IllegalArgumentException(
+            key + " = " + value + " > max = " + max);
+      }
+    };
+  }
+
   static BiConsumer<String, Long> requireMin(SizeInBytes min) {
     return requireMin(min.getSize());
   }
@@ -150,6 +159,13 @@ public interface ConfUtils {
   }
 
   @SafeVarargs
+  static double getDouble(
+      BiFunction<String, Double, Double> doubleGetter,
+      String key, double defaultValue, Consumer<String> logger, 
BiConsumer<String, Double>... assertions) {
+    return get(doubleGetter, key, defaultValue, logger, assertions);
+  }
+
+  @SafeVarargs
   static File getFile(
       BiFunction<String, File, File> fileGetter,
       String key, File defaultValue, Consumer<String> logger, 
BiConsumer<String, File>... assertions) {
@@ -219,6 +235,13 @@ public interface ConfUtils {
   }
 
   @SafeVarargs
+  static void setDouble(
+      BiConsumer<String, Double> doubleSetter, String key, double value,
+      BiConsumer<String, Double>... assertions) {
+    set(doubleSetter, key, value, assertions);
+  }
+
+  @SafeVarargs
   static void setFile(
       BiConsumer<String, File> fileSetter, String key, File value,
       BiConsumer<String, File>... assertions) {
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 7748413..f27fda2 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -119,6 +119,18 @@ public interface RaftServerConfigKeys {
     static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit) 
{
       setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit, 
requireMin(1L));
     }
+
+    String FOLLOWER_GAP_RATIO_MAX_KEY = PREFIX + ".follower.gap.ratio.max";
+    // The valid range is [1, 0) and -1, -1 means disable this feature
+    double FOLLOWER_GAP_RATIO_MAX_DEFAULT = -1d;
+
+    static double followerGapRatioMax(RaftProperties properties) {
+      return getDouble(properties::getDouble, FOLLOWER_GAP_RATIO_MAX_KEY,
+          FOLLOWER_GAP_RATIO_MAX_DEFAULT, getDefaultLog(), requireMax(1d));
+    }
+    static void setFollowerGapRatioMax(RaftProperties properties, float ratio) 
{
+      setDouble(properties::setDouble, FOLLOWER_GAP_RATIO_MAX_KEY, ratio, 
requireMax(1d));
+    }
   }
 
   interface Watch {
diff --git 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index e62a48f..3723600 100644
--- 
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ 
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -79,6 +79,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.ratis.server.RaftServer.Division.LOG;
+import static 
org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;
 
 /**
  * States for leader only. It contains three different types of processors:
@@ -250,6 +251,7 @@ class LeaderStateImpl implements LeaderState {
   private final long placeHolderIndex;
   private final RaftServerMetricsImpl raftServerMetrics;
   private final LogAppenderMetrics logAppenderMetrics;
+  private final long followerMaxGapThreshold;
 
   LeaderStateImpl(RaftServerImpl server) {
     this.name = server.getMemberId() + "-" + 
JavaUtils.getClassSimpleName(getClass());
@@ -269,6 +271,17 @@ class LeaderStateImpl implements LeaderState {
     this.pendingRequests = new PendingRequests(server.getMemberId(), 
properties, raftServerMetrics);
     this.watchRequests = new WatchRequests(server.getMemberId(), properties);
     this.messageStreamRequests = new 
MessageStreamRequests(server.getMemberId());
+    long maxPendingRequests = 
RaftServerConfigKeys.Write.elementLimit(properties);
+    double followerGapRatioMax = 
RaftServerConfigKeys.Write.followerGapRatioMax(properties);
+
+    if (followerGapRatioMax == -1) {
+      this.followerMaxGapThreshold = -1;
+    } else if (followerGapRatioMax > 1f || followerGapRatioMax <= 0f) {
+      throw new IllegalArgumentException(FOLLOWER_GAP_RATIO_MAX_KEY +
+          "s value must between [1, 0) to enable the feature");
+    } else {
+      this.followerMaxGapThreshold = (long) (followerGapRatioMax * 
maxPendingRequests);
+    }
 
     final RaftConfigurationImpl conf = state.getRaftConf();
     Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
@@ -700,6 +713,17 @@ class LeaderStateImpl implements LeaderState {
       return new MinMajorityMax(sorted[0], getMajority(sorted), 
getMax(sorted));
     }
 
+    static MinMajorityMax valueOf(long[] sorted, long gapThreshold) {
+      long majority = getMajority(sorted);
+      long min = sorted[0];
+      if (gapThreshold != -1 && (majority - min) > gapThreshold) {
+        // The the gap between majority and min(the slow follower) is greater 
than gapThreshold,
+        // set the majority to min, which will skip one round of 
lastCommittedIndex update in updateCommit().
+        majority = min;
+      }
+      return new MinMajorityMax(min, majority, getMax(sorted));
+    }
+
     static long getMajority(long[] sorted) {
       return sorted[(sorted.length - 1) / 2];
     }
@@ -725,7 +749,7 @@ class LeaderStateImpl implements LeaderState {
     }
 
     final long[] indicesInNewConf = getSorted(followers, includeSelf, 
followerIndex, logIndex);
-    final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf);
+    final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf, 
followerMaxGapThreshold);
 
     if (!conf.isTransitional()) {
       return Optional.of(newConf);
@@ -737,7 +761,7 @@ class LeaderStateImpl implements LeaderState {
       }
 
       final long[] indicesInOldConf = getSorted(oldFollowers, 
includeSelfInOldConf, followerIndex, logIndex);
-      final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf);
+      final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf, 
followerMaxGapThreshold);
       return Optional.of(newConf.combine(oldConf));
     }
   }

Reply via email to