This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 326fc81d7a [IOTDB-3642] Add retry mechanism when resource unavailable 
(#7240)
326fc81d7a is described below

commit 326fc81d7a8c97ce05e297be21d5224b1257e830
Author: William Song <[email protected]>
AuthorDate: Wed Sep 7 08:44:06 2022 +0800

    [IOTDB-3642] Add retry mechanism when resource unavailable (#7240)
---
 .../apache/iotdb/consensus/config/RatisConfig.java | 59 +++++++++++++++++++++-
 .../iotdb/consensus/ratis/RatisConsensus.java      | 53 ++++++++++++++++++-
 2 files changed, 108 insertions(+), 4 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index e976137287..877a361c4a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -34,6 +34,7 @@ public class RatisConfig {
   private final ThreadPool threadPool;
   private final Log log;
   private final Grpc grpc;
+  private final RatisConsensus ratisConsensus;
 
   private RatisConfig(
       Rpc rpc,
@@ -41,13 +42,15 @@ public class RatisConfig {
       Snapshot snapshot,
       ThreadPool threadPool,
       Log log,
-      Grpc grpc) {
+      Grpc grpc,
+      RatisConsensus ratisConsensus) {
     this.rpc = rpc;
     this.leaderElection = leaderElection;
     this.snapshot = snapshot;
     this.threadPool = threadPool;
     this.log = log;
     this.grpc = grpc;
+    this.ratisConsensus = ratisConsensus;
   }
 
   public Rpc getRpc() {
@@ -74,6 +77,10 @@ public class RatisConfig {
     return grpc;
   }
 
+  public RatisConsensus getRatisConsensus() {
+    return ratisConsensus;
+  }
+
   public static Builder newBuilder() {
     return new Builder();
   }
@@ -85,6 +92,7 @@ public class RatisConfig {
     private ThreadPool threadPool;
     private Log log;
     private Grpc grpc;
+    private RatisConsensus ratisConsensus;
 
     public RatisConfig build() {
       return new RatisConfig(
@@ -93,7 +101,8 @@ public class RatisConfig {
           snapshot != null ? snapshot : Snapshot.newBuilder().build(),
           threadPool != null ? threadPool : ThreadPool.newBuilder().build(),
           log != null ? log : Log.newBuilder().build(),
-          grpc != null ? grpc : Grpc.newBuilder().build());
+          grpc != null ? grpc : Grpc.newBuilder().build(),
+          ratisConsensus != null ? ratisConsensus : 
RatisConsensus.newBuilder().build());
     }
 
     public Builder setRpc(Rpc rpc) {
@@ -125,6 +134,11 @@ public class RatisConfig {
       this.grpc = grpc;
       return this;
     }
+
+    public Builder setRatisConsensus(RatisConsensus ratisConsensus) {
+      this.ratisConsensus = ratisConsensus;
+      return this;
+    }
   }
 
   /** server rpc timeout related */
@@ -692,4 +706,45 @@ public class RatisConfig {
       }
     }
   }
+
+  public static class RatisConsensus {
+    private final int retryTimesMax;
+    private final long retryWaitMillis;
+
+    private RatisConsensus(int retryTimesMax, long retryWaitMillis) {
+      this.retryTimesMax = retryTimesMax;
+      this.retryWaitMillis = retryWaitMillis;
+    }
+
+    public int getRetryTimesMax() {
+      return retryTimesMax;
+    }
+
+    public long getRetryWaitMillis() {
+      return retryWaitMillis;
+    }
+
+    public static RatisConsensus.Builder newBuilder() {
+      return new Builder();
+    }
+
+    public static class Builder {
+      private int retryTimesMax = 3;
+      private long retryWaitMillis = 500;
+
+      public RatisConsensus build() {
+        return new RatisConsensus(retryTimesMax, retryWaitMillis);
+      }
+
+      public RatisConsensus.Builder setRetryTimesMax(int retryTimesMax) {
+        this.retryTimesMax = retryTimesMax;
+        return this;
+      }
+
+      public RatisConsensus.Builder setRetryWaitMillis(long retryWaitMillis) {
+        this.retryWaitMillis = retryWaitMillis;
+        return this;
+      }
+    }
+  }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index fcab610918..1abf6a473a 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -38,6 +38,7 @@ import 
org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
 import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.RatisConfig;
 import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.NodeReadOnlyException;
@@ -62,9 +63,11 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.SnapshotManagementRequest;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.function.CheckedSupplier;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,6 +114,8 @@ class RatisConsensus implements IConsensus {
   // TODO make it configurable
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
+  private final RatisConfig config;
+
   public RatisConsensus(ConsensusConfig config, IStateMachine.Registry 
registry)
       throws IOException {
     myself = Utils.fromTEndPointAndPriorityToRaftPeer(config.getThisNode(), 
DEFAULT_PRIORITY);
@@ -122,6 +127,7 @@ class RatisConsensus implements IConsensus {
     GrpcConfigKeys.Server.setPort(properties, config.getThisNode().getPort());
 
     Utils.initRatisConfig(properties, config.getRatisConfig());
+    this.config = config.getRatisConfig();
 
     clientRpc = new GrpcFactory(new 
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
 
@@ -148,6 +154,49 @@ class RatisConsensus implements IConsensus {
     server.close();
   }
 
+  private boolean shouldRetry(RaftClientReply reply) {
+    // currently, we only retry when ResourceUnavailableException is caught
+    return !reply.isSuccess()
+        && (reply.getException() != null
+            && reply.getException() instanceof ResourceUnavailableException);
+  }
+  /** launch a consensus write with retry mechanism */
+  private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, 
IOException> caller)
+      throws IOException {
+
+    final int maxRetryTimes = config.getRatisConsensus().getRetryTimesMax();
+    final long waitMillis = config.getRatisConsensus().getRetryWaitMillis();
+
+    int retry = 0;
+    RaftClientReply reply = null;
+    while (retry < maxRetryTimes) {
+      retry++;
+
+      reply = caller.get();
+      if (!shouldRetry(reply)) {
+        return reply;
+      }
+      logger.debug("{} sending write request with retry = {} and reply = {}", 
this, retry, reply);
+
+      try {
+        Thread.sleep(waitMillis);
+      } catch (InterruptedException e) {
+        logger.warn("{} retry write sleep is interrupted: {}", this, e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    return reply;
+  }
+
+  private RaftClientReply writeLocallyWithRetry(RaftClientRequest request) 
throws IOException {
+    return writeWithRetry(() -> server.submitClientRequest(request));
+  }
+
+  private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message 
message)
+      throws IOException {
+    return writeWithRetry(() -> client.getRaftClient().io().send(message));
+  }
+
   /**
    * write will first send request to local server use method call if local 
server is not leader, it
    * will use RaftClient to send RPC to read leader
@@ -183,7 +232,7 @@ class RatisConsensus implements IConsensus {
     RaftPeer suggestedLeader = null;
     if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
       try {
-        localServerReply = server.submitClientRequest(clientRequest);
+        localServerReply = writeLocallyWithRetry(clientRequest);
         if (localServerReply.isSuccess()) {
           ResponseMessage responseMessage = (ResponseMessage) 
localServerReply.getMessage();
           TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
@@ -203,7 +252,7 @@ class RatisConsensus implements IConsensus {
     RatisClient client = null;
     try {
       client = getRaftClient(raftGroup);
-      RaftClientReply reply = client.getRaftClient().io().send(message);
+      RaftClientReply reply = writeRemotelyWithRetry(client, message);
       if (!reply.isSuccess()) {
         return failedWrite(new 
RatisRequestFailedException(reply.getException()));
       }

Reply via email to