This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 837b063 RATIS-1411. Alleviate slow follower issue (#508)
837b063 is described below
commit 837b063a4e11edd241759c7b967af44120b25f60
Author: Sammi Chen <[email protected]>
AuthorDate: Thu Sep 30 18:34:56 2021 +0800
RATIS-1411. Alleviate slow follower issue (#508)
---
.../main/java/org/apache/ratis/conf/ConfUtils.java | 23 ++++++++++++++++++
.../apache/ratis/server/RaftServerConfigKeys.java | 12 ++++++++++
.../apache/ratis/server/impl/LeaderStateImpl.java | 28 ++++++++++++++++++++--
3 files changed, 61 insertions(+), 2 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
index 989070f..1d7d9c5 100644
--- a/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/conf/ConfUtils.java
@@ -68,6 +68,15 @@ public interface ConfUtils {
};
}
+ static BiConsumer<String, Double> requireMax(double max) {
+ return (key, value) -> {
+ if (value > max) {
+ throw new IllegalArgumentException(
+ key + " = " + value + " > max = " + max);
+ }
+ };
+ }
+
static BiConsumer<String, Long> requireMin(SizeInBytes min) {
return requireMin(min.getSize());
}
@@ -150,6 +159,13 @@ public interface ConfUtils {
}
@SafeVarargs
+ static double getDouble(
+ BiFunction<String, Double, Double> doubleGetter,
+ String key, double defaultValue, Consumer<String> logger,
BiConsumer<String, Double>... assertions) {
+ return get(doubleGetter, key, defaultValue, logger, assertions);
+ }
+
+ @SafeVarargs
static File getFile(
BiFunction<String, File, File> fileGetter,
String key, File defaultValue, Consumer<String> logger,
BiConsumer<String, File>... assertions) {
@@ -219,6 +235,13 @@ public interface ConfUtils {
}
@SafeVarargs
+ static void setDouble(
+ BiConsumer<String, Double> doubleSetter, String key, double value,
+ BiConsumer<String, Double>... assertions) {
+ set(doubleSetter, key, value, assertions);
+ }
+
+ @SafeVarargs
static void setFile(
BiConsumer<String, File> fileSetter, String key, File value,
BiConsumer<String, File>... assertions) {
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 7748413..f27fda2 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -119,6 +119,18 @@ public interface RaftServerConfigKeys {
static void setByteLimit(RaftProperties properties, SizeInBytes byteLimit)
{
setSizeInBytes(properties::set, BYTE_LIMIT_KEY, byteLimit,
requireMin(1L));
}
+
+ String FOLLOWER_GAP_RATIO_MAX_KEY = PREFIX + ".follower.gap.ratio.max";
+ // The valid range is [1, 0) and -1, -1 means disable this feature
+ double FOLLOWER_GAP_RATIO_MAX_DEFAULT = -1d;
+
+ static double followerGapRatioMax(RaftProperties properties) {
+ return getDouble(properties::getDouble, FOLLOWER_GAP_RATIO_MAX_KEY,
+ FOLLOWER_GAP_RATIO_MAX_DEFAULT, getDefaultLog(), requireMax(1d));
+ }
+ static void setFollowerGapRatioMax(RaftProperties properties, float ratio)
{
+ setDouble(properties::setDouble, FOLLOWER_GAP_RATIO_MAX_KEY, ratio,
requireMax(1d));
+ }
}
interface Watch {
diff --git
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index e62a48f..3723600 100644
---
a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++
b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -79,6 +79,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.ratis.server.RaftServer.Division.LOG;
+import static
org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;
/**
* States for leader only. It contains three different types of processors:
@@ -250,6 +251,7 @@ class LeaderStateImpl implements LeaderState {
private final long placeHolderIndex;
private final RaftServerMetricsImpl raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
+ private final long followerMaxGapThreshold;
LeaderStateImpl(RaftServerImpl server) {
this.name = server.getMemberId() + "-" +
JavaUtils.getClassSimpleName(getClass());
@@ -269,6 +271,17 @@ class LeaderStateImpl implements LeaderState {
this.pendingRequests = new PendingRequests(server.getMemberId(),
properties, raftServerMetrics);
this.watchRequests = new WatchRequests(server.getMemberId(), properties);
this.messageStreamRequests = new
MessageStreamRequests(server.getMemberId());
+ long maxPendingRequests =
RaftServerConfigKeys.Write.elementLimit(properties);
+ double followerGapRatioMax =
RaftServerConfigKeys.Write.followerGapRatioMax(properties);
+
+ if (followerGapRatioMax == -1) {
+ this.followerMaxGapThreshold = -1;
+ } else if (followerGapRatioMax > 1f || followerGapRatioMax <= 0f) {
+ throw new IllegalArgumentException(FOLLOWER_GAP_RATIO_MAX_KEY +
+ "s value must between [1, 0) to enable the feature");
+ } else {
+ this.followerMaxGapThreshold = (long) (followerGapRatioMax *
maxPendingRequests);
+ }
final RaftConfigurationImpl conf = state.getRaftConf();
Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
@@ -700,6 +713,17 @@ class LeaderStateImpl implements LeaderState {
return new MinMajorityMax(sorted[0], getMajority(sorted),
getMax(sorted));
}
+ static MinMajorityMax valueOf(long[] sorted, long gapThreshold) {
+ long majority = getMajority(sorted);
+ long min = sorted[0];
+ if (gapThreshold != -1 && (majority - min) > gapThreshold) {
+ // The the gap between majority and min(the slow follower) is greater
than gapThreshold,
+ // set the majority to min, which will skip one round of
lastCommittedIndex update in updateCommit().
+ majority = min;
+ }
+ return new MinMajorityMax(min, majority, getMax(sorted));
+ }
+
static long getMajority(long[] sorted) {
return sorted[(sorted.length - 1) / 2];
}
@@ -725,7 +749,7 @@ class LeaderStateImpl implements LeaderState {
}
final long[] indicesInNewConf = getSorted(followers, includeSelf,
followerIndex, logIndex);
- final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf);
+ final MinMajorityMax newConf = MinMajorityMax.valueOf(indicesInNewConf,
followerMaxGapThreshold);
if (!conf.isTransitional()) {
return Optional.of(newConf);
@@ -737,7 +761,7 @@ class LeaderStateImpl implements LeaderState {
}
final long[] indicesInOldConf = getSorted(oldFollowers,
includeSelfInOldConf, followerIndex, logIndex);
- final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf);
+ final MinMajorityMax oldConf = MinMajorityMax.valueOf(indicesInOldConf,
followerMaxGapThreshold);
return Optional.of(newConf.combine(oldConf));
}
}