This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ec1cfc559 RATIS-1836. Frequent retry by leader on failure path for 
appendEntry (#883)
ec1cfc559 is described below

commit ec1cfc559c0ae71ba8240852eaa5fc2ab1c28c9e
Author: Sumit Agrawal <[email protected]>
AuthorDate: Fri Jun 2 13:21:43 2023 +0530

    RATIS-1836. Frequent retry by leader on failure path for appendEntry (#883)
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 22 +++++++++++++++++++++-
 .../apache/ratis/server/RaftServerConfigKeys.java  | 15 +++++++++++++++
 .../test/java/org/apache/ratis/RaftBasicTests.java |  2 +-
 .../ratis/server/impl/ServerPauseResumeTest.java   |  2 ++
 4 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java 
b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 69b030bf0..3544f3be1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -24,6 +24,8 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.metrics.Timekeeper;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.MultipleLinearRandomRetry;
+import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.FollowerInfo;
@@ -47,6 +49,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 /**
@@ -80,6 +83,8 @@ public class GrpcLogAppender extends LogAppenderBase {
 
   private final AutoCloseableReadWriteLock lock;
   private final StackTraceElement caller;
+  private final RetryPolicy errorRetryWaitPolicy;
+  private final AtomicInteger errCount = new AtomicInteger(0);
 
   public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, 
FollowerInfo f) {
     super(server, leaderState, f);
@@ -100,6 +105,8 @@ public class GrpcLogAppender extends LogAppenderBase {
 
     lock = new AutoCloseableReadWriteLock(this);
     caller = LOG.isTraceEnabled()? JavaUtils.getCallerStackTraceElement(): 
null;
+    errorRetryWaitPolicy = MultipleLinearRandomRetry.parseCommaSeparated(
+        RaftServerConfigKeys.Log.Appender.retryPolicy(properties));
   }
 
   @Override
@@ -193,13 +200,19 @@ public class GrpcLogAppender extends LogAppenderBase {
   private void mayWait() {
     // use lastSend time instead of lastResponse time
     try {
-      getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
+      getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(),
+          TimeUnit.MILLISECONDS);
     } catch (InterruptedException ie) {
       LOG.warn(this + ": Wait interrupted by " + ie);
       Thread.currentThread().interrupt();
     }
   }
 
+  private long errorWaitTimeMs() {
+    return errorRetryWaitPolicy.handleAttemptFailure(errCount::get)
+        .getSleepTime().toLong(TimeUnit.MILLISECONDS);
+  }
+
   @Override
   public void stop() {
     grpcServerMetrics.unregister();
@@ -316,6 +329,7 @@ public class GrpcLogAppender extends LogAppenderBase {
   private void timeoutAppendRequest(long cid, boolean heartbeat) {
     final AppendEntriesRequest pending = pendingRequests.handleTimeout(cid, 
heartbeat);
     if (pending != null) {
+      errCount.incrementAndGet();
       LOG.warn("{}: {} appendEntries Timeout, request={}", this, heartbeat ? 
"HEARTBEAT" : "", pending);
       grpcServerMetrics.onRequestTimeout(getFollowerId().toString(), 
heartbeat);
       pending.stopRequestTimer();
@@ -373,6 +387,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     private void onNextImpl(AppendEntriesReplyProto reply) {
       // update the last rpc time
       getFollower().updateLastRpcResponseTime();
+      errCount.set(0);
 
       if (!firstResponseReceived) {
         firstResponseReceived = true;
@@ -414,6 +429,7 @@ public class GrpcLogAppender extends LogAppenderBase {
         LOG.info("{} is already stopped", GrpcLogAppender.this);
         return;
       }
+      errCount.incrementAndGet();
       GrpcUtil.warn(LOG, () -> this + ": Failed appendEntries", t);
       grpcServerMetrics.onRequestRetry(); // Update try counter
       AppendEntriesRequest request = 
pendingRequests.remove(GrpcUtil.getCallId(t), GrpcUtil.isHeartbeat(t));
@@ -423,6 +439,7 @@ public class GrpcLogAppender extends LogAppenderBase {
     @Override
     public void onCompleted() {
       LOG.info("{}: follower responses appendEntries COMPLETED", this);
+      errCount.set(0);
       resetClient(null, false);
     }
 
@@ -507,6 +524,7 @@ public class GrpcLogAppender extends LogAppenderBase {
 
       // update the last rpc time
       getFollower().updateLastRpcResponseTime();
+      errCount.set(0);
 
       if (!firstResponseReceived) {
         firstResponseReceived = true;
@@ -570,6 +588,7 @@ public class GrpcLogAppender extends LogAppenderBase {
         LOG.info("{} is stopped", GrpcLogAppender.this);
         return;
       }
+      errCount.incrementAndGet();
       GrpcUtil.warn(LOG, () -> this + ": Failed InstallSnapshot", t);
       grpcServerMetrics.onRequestRetry(); // Update try counter
       resetClient(null, true);
@@ -581,6 +600,7 @@ public class GrpcLogAppender extends LogAppenderBase {
       if (!isNotificationOnly || LOG.isDebugEnabled()) {
         LOG.info("{}: follower responded installSnapshot COMPLETED", this);
       }
+      errCount.set(0);
       close();
     }
 
diff --git 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 6c5aa1212..2fc025535 100644
--- 
a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ 
b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -553,6 +553,21 @@ public interface RaftServerConfigKeys {
       static void setWaitTimeMin(RaftProperties properties, TimeDuration 
minDuration) {
         setTimeDuration(properties::setTimeDuration, WAIT_TIME_MIN_KEY, 
minDuration);
       }
+
+      String RETRY_POLICY_KEY = PREFIX + ".retry.policy";
+      /**
+       * The min wait time as 1ms (0 is not allowed) for first 10,
+       * (5 iteration with 2 times grpc client retry)
+       * next wait 1sec for next 20 retry (10 iteration with 2 times grpc 
client)
+       * further wait for 5sec for max times ((5sec*980)/2 times ~= 40min)
+       */
+      String RETRY_POLICY_DEFAULT = "1ms,10, 1s,20, 5s,1000";
+      static String retryPolicy(RaftProperties properties) {
+        return properties.get(RETRY_POLICY_KEY, RETRY_POLICY_DEFAULT);
+      }
+      static void setRetryPolicy(RaftProperties properties, String 
retryPolicy) {
+        properties.set(RETRY_POLICY_KEY, retryPolicy);
+      }
     }
   }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java 
b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 8bfe66389..47c9b0e08 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -169,7 +169,7 @@ public abstract class RaftBasicTests<CLUSTER extends 
MiniRaftCluster>
     final List<RaftServer.Division> divisions = 
cluster.getServerAliveStream().collect(Collectors.toList());
     for(RaftServer.Division impl: divisions) {
         JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, 
messages),
-            5, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " 
assertLogEntries", LOG);
+            50, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " 
assertLogEntries", LOG);
     }
   }
 
diff --git 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
index 9f2ae560c..d2584c631 100644
--- 
a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
+++ 
b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
@@ -27,6 +27,7 @@ import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.junit.Assert;
 import org.junit.Test;
@@ -40,6 +41,7 @@ public abstract class ServerPauseResumeTest <CLUSTER extends 
MiniRaftCluster>
 
   @Test
   public void testPauseResume() throws Exception {
+    RaftServerConfigKeys.Log.Appender.setRetryPolicy(getProperties(), 
"1ms,1000");
     runWithNewCluster(NUM_SERVERS, this::runTestPauseResume);
   }
 

Reply via email to