This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ec1cfc559 RATIS-1836. Frequent retry by leader on failure path for
appendEntry (#883)
ec1cfc559 is described below
commit ec1cfc559c0ae71ba8240852eaa5fc2ab1c28c9e
Author: Sumit Agrawal <[email protected]>
AuthorDate: Fri Jun 2 13:21:43 2023 +0530
RATIS-1836. Frequent retry by leader on failure path for appendEntry (#883)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 22 +++++++++++++++++++++-
.../apache/ratis/server/RaftServerConfigKeys.java | 15 +++++++++++++++
.../test/java/org/apache/ratis/RaftBasicTests.java | 2 +-
.../ratis/server/impl/ServerPauseResumeTest.java | 2 ++
4 files changed, 39 insertions(+), 2 deletions(-)
diff --git
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 69b030bf0..3544f3be1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -24,6 +24,8 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.MultipleLinearRandomRetry;
+import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
@@ -47,6 +49,7 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -80,6 +83,8 @@ public class GrpcLogAppender extends LogAppenderBase {
private final AutoCloseableReadWriteLock lock;
private final StackTraceElement caller;
+ private final RetryPolicy errorRetryWaitPolicy;
+ private final AtomicInteger errCount = new AtomicInteger(0);
public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState,
FollowerInfo f) {
super(server, leaderState, f);
@@ -100,6 +105,8 @@ public class GrpcLogAppender extends LogAppenderBase {
lock = new AutoCloseableReadWriteLock(this);
caller = LOG.isTraceEnabled()? JavaUtils.getCallerStackTraceElement():
null;
+ errorRetryWaitPolicy = MultipleLinearRandomRetry.parseCommaSeparated(
+ RaftServerConfigKeys.Log.Appender.retryPolicy(properties));
}
@Override
@@ -193,13 +200,19 @@ public class GrpcLogAppender extends LogAppenderBase {
private void mayWait() {
// use lastSend time instead of lastResponse time
try {
- getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
+ getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(),
+ TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
LOG.warn(this + ": Wait interrupted by " + ie);
Thread.currentThread().interrupt();
}
}
+ private long errorWaitTimeMs() {
+ return errorRetryWaitPolicy.handleAttemptFailure(errCount::get)
+ .getSleepTime().toLong(TimeUnit.MILLISECONDS);
+ }
+
@Override
public void stop() {
grpcServerMetrics.unregister();
@@ -316,6 +329,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private void timeoutAppendRequest(long cid, boolean heartbeat) {
final AppendEntriesRequest pending = pendingRequests.handleTimeout(cid,
heartbeat);
if (pending != null) {
+ errCount.incrementAndGet();
LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ?
"HEARTBEAT" : "", pending);
grpcServerMetrics.onRequestTimeout(getFollowerId().toString(),
heartbeat);
pending.stopRequestTimer();
@@ -373,6 +387,7 @@ public class GrpcLogAppender extends LogAppenderBase {
private void onNextImpl(AppendEntriesReplyProto reply) {
// update the last rpc time
getFollower().updateLastRpcResponseTime();
+ errCount.set(0);
if (!firstResponseReceived) {
firstResponseReceived = true;
@@ -414,6 +429,7 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.info("{} is already stopped", GrpcLogAppender.this);
return;
}
+ errCount.incrementAndGet();
GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
grpcServerMetrics.onRequestRetry(); // Update try counter
AppendEntriesRequest request =
pendingRequests.remove(GrpcUtil.getCallId(t), GrpcUtil.isHeartbeat(t));
@@ -423,6 +439,7 @@ public class GrpcLogAppender extends LogAppenderBase {
@Override
public void onCompleted() {
LOG.info("{}: follower responses appendEntries COMPLETED", this);
+ errCount.set(0);
resetClient(null, false);
}
@@ -507,6 +524,7 @@ public class GrpcLogAppender extends LogAppenderBase {
// update the last rpc time
getFollower().updateLastRpcResponseTime();
+ errCount.set(0);
if (!firstResponseReceived) {
firstResponseReceived = true;
@@ -570,6 +588,7 @@ public class GrpcLogAppender extends LogAppenderBase {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
+ errCount.incrementAndGet();
GrpcUtil.warn(LOG, () -> this + ": Failed InstallSnapshot", t);
grpcServerMetrics.onRequestRetry(); // Update try counter
resetClient(null, true);
@@ -581,6 +600,7 @@ public class GrpcLogAppender extends LogAppenderBase {
if (!isNotificationOnly || LOG.isDebugEnabled()) {
LOG.info("{}: follower responded installSnapshot COMPLETED", this);
}
+ errCount.set(0);
close();
}
diff --git
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 6c5aa1212..2fc025535 100644
---
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -553,6 +553,21 @@ public interface RaftServerConfigKeys {
static void setWaitTimeMin(RaftProperties properties, TimeDuration
minDuration) {
setTimeDuration(properties::setTimeDuration, WAIT_TIME_MIN_KEY,
minDuration);
}
+
+ String RETRY_POLICY_KEY = PREFIX + ".retry.policy";
+ /**
+ * The min wait time as 1ms (0 is not allowed) for first 10,
+ * (5 iteration with 2 times grpc client retry)
+ * next wait 1sec for next 20 retry (10 iteration with 2 times grpc
client)
+ * further wait for 5sec for max times ((5sec*980)/2 times ~= 40min)
+ */
+ String RETRY_POLICY_DEFAULT = "1ms,10, 1s,20, 5s,1000";
+ static String retryPolicy(RaftProperties properties) {
+ return properties.get(RETRY_POLICY_KEY, RETRY_POLICY_DEFAULT);
+ }
+ static void setRetryPolicy(RaftProperties properties, String
retryPolicy) {
+ properties.set(RETRY_POLICY_KEY, retryPolicy);
+ }
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 8bfe66389..47c9b0e08 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -169,7 +169,7 @@ public abstract class RaftBasicTests<CLUSTER extends
MiniRaftCluster>
final List<RaftServer.Division> divisions =
cluster.getServerAliveStream().collect(Collectors.toList());
for(RaftServer.Division impl: divisions) {
JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term,
messages),
- 5, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + "
assertLogEntries", LOG);
+ 50, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + "
assertLogEntries", LOG);
}
}
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
index 9f2ae560c..d2584c631 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
@@ -27,6 +27,7 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.raftlog.RaftLog;
import org.junit.Assert;
import org.junit.Test;
@@ -40,6 +41,7 @@ public abstract class ServerPauseResumeTest <CLUSTER extends
MiniRaftCluster>
@Test
public void testPauseResume() throws Exception {
+ RaftServerConfigKeys.Log.Appender.setRetryPolicy(getProperties(),
"1ms,1000");
runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
}