This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8901076ec17 [IOTDB-6227] Add retry for RatisConsensus read (#11458)
8901076ec17 is described below

commit 8901076ec17f2a54c393567e1fcd51f099a176f8
Author: William Song <[email protected]>
AuthorDate: Fri Nov 10 11:10:44 2023 +0800

    [IOTDB-6227] Add retry for RatisConsensus read (#11458)
---
 .../ratis/ApplicationStateMachineProxy.java        |  16 +--
 .../iotdb/consensus/ratis/RatisConsensus.java      |  91 +++++++++++-------
 .../iotdb/consensus/ratis/utils/Retriable.java     | 107 +++++++++++++++++++++
 .../iotdb/consensus/ratis/utils/RetryPolicy.java   |  79 +++++++++++++++
 .../apache/iotdb/consensus/ratis/utils/Utils.java  |  10 ++
 .../iotdb/consensus/ratis/RecoverReadTest.java     |   3 +-
 6 files changed, 262 insertions(+), 44 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index fda00ab8504..1460398ab05 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.Retriable;
 import org.apache.iotdb.consensus.ratis.utils.Utils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -46,6 +47,7 @@ import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,7 +58,6 @@ import java.nio.file.StandardCopyOption;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class ApplicationStateMachineProxy extends BaseStateMachine {
@@ -182,13 +183,12 @@ public class ApplicationStateMachineProxy extends 
BaseStateMachine {
   }
 
   private void waitUntilSystemAllowApply() {
-    while (Utils.stallApply()) {
-      try {
-        TimeUnit.SECONDS.sleep(60);
-      } catch (InterruptedException e) {
-        logger.warn("{}: interrupted when waiting until system ready: ", this, 
e);
-        Thread.currentThread().interrupt();
-      }
+    try {
+      Retriable.attemptUntilTrue(
+          () -> !Utils.stallApply(), TimeDuration.ONE_MINUTE, 
"waitUntilSystemAllowApply", logger);
+    } catch (InterruptedException e) {
+      logger.warn("{}: interrupted when waiting until system ready: ", this, 
e);
+      Thread.currentThread().interrupt();
     }
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index a4bbf19cb8a..a4aece95010 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -48,6 +48,8 @@ import 
org.apache.iotdb.consensus.exception.RatisRequestFailedException;
 import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
 import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.Retriable;
+import org.apache.iotdb.consensus.ratis.utils.RetryPolicy;
 import org.apache.iotdb.consensus.ratis.utils.Utils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -94,6 +96,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
 /** A multi-raft consensus implementation based on Apache Ratis. */
@@ -124,6 +127,8 @@ class RatisConsensus implements IConsensus {
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
   private final RatisConfig config;
+  private final RetryPolicy<RaftClientReply> readRetryPolicy;
+  private final RetryPolicy<RaftClientReply> writeRetryPolicy;
 
   private final RatisMetricSet ratisMetricSet;
   private final TConsensusGroupType consensusGroupType;
@@ -145,6 +150,26 @@ class RatisConsensus implements IConsensus {
     this.config = config.getRatisConfig();
     this.consensusGroupType = config.getConsensusGroupType();
     this.ratisMetricSet = new RatisMetricSet();
+    this.readRetryPolicy =
+        RetryPolicy.<RaftClientReply>newBuilder()
+            .setRetryHandler(c -> !c.isSuccess() && c.getException() 
instanceof ReadIndexException)
+            .setMaxAttempts(this.config.getImpl().getRetryTimesMax())
+            .setWaitTime(
+                TimeDuration.valueOf(
+                    this.config.getImpl().getRetryWaitMillis(), 
TimeUnit.MILLISECONDS))
+            .build();
+    this.writeRetryPolicy =
+        RetryPolicy.<RaftClientReply>newBuilder()
+            // currently, we only retry when ResourceUnavailableException is 
caught
+            .setRetryHandler(
+                reply ->
+                    !reply.isSuccess()
+                        && (reply.getException() instanceof 
ResourceUnavailableException))
+            .setMaxAttempts(this.config.getImpl().getRetryTimesMax())
+            .setWaitTime(
+                TimeDuration.valueOf(
+                    this.config.getImpl().getRetryWaitMillis(), 
TimeUnit.MILLISECONDS))
+            .build();
 
     this.diskGuardian = new DiskGuardian(() -> this, this.config);
 
@@ -188,36 +213,17 @@ class RatisConsensus implements IConsensus {
     }
   }
 
-  private boolean shouldRetry(RaftClientReply reply) {
-    // currently, we only retry when ResourceUnavailableException is caught
-    return !reply.isSuccess() && (reply.getException() instanceof 
ResourceUnavailableException);
-  }
-
   /** launch a consensus write with retry mechanism */
   private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply, 
IOException> caller)
       throws IOException {
-
-    final int maxRetryTimes = config.getImpl().getRetryTimesMax();
-    final long waitMillis = config.getImpl().getRetryWaitMillis();
-
-    int retry = 0;
     RaftClientReply reply = null;
-    while (retry < maxRetryTimes) {
-      retry++;
-
-      reply = caller.get();
-      if (!shouldRetry(reply)) {
-        return reply;
-      }
-      logger.debug("{} sending write request with retry = {} and reply = {}", 
this, retry, reply);
-
-      try {
-        Thread.sleep(waitMillis);
-      } catch (InterruptedException e) {
-        logger.warn("{} retry write sleep is interrupted: {}", this, e);
-        Thread.currentThread().interrupt();
-      }
+    try {
+      reply = Retriable.attempt(caller, writeRetryPolicy, () -> caller, 
logger);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      logger.debug("{}: interrupted when retrying for write request {}", this, 
caller);
     }
+
     if (reply == null) {
       return RaftClientReply.newBuilder()
           .setSuccess(false)
@@ -360,7 +366,12 @@ class RatisConsensus implements IConsensus {
     RaftClientReply reply;
     try (AutoCloseable ignored =
         RatisMetricsManager.getInstance().startReadTimer(consensusGroupType)) {
-      reply = server.submitClientRequest(request);
+      reply =
+          Retriable.attempt(
+              () -> server.submitClientRequest(request),
+              readRetryPolicy,
+              () -> readRequest,
+              logger);
     }
 
     // rethrow the exception if the reply is not successful
@@ -590,15 +601,27 @@ class RatisConsensus implements IConsensus {
       logger.info("isLeaderReady checking failed with exception: ", e);
       return false;
     }
-    long startTime = System.currentTimeMillis();
+
+    final long startTime = System.currentTimeMillis();
+    final BooleanSupplier noRetryAtAnyOfFollowingCondition =
+        () ->
+            Utils.anyOf(
+                // this peer is not a leader
+                () -> !divisionInfo.isLeader(),
+                // this peer is a ready leader
+                () -> divisionInfo.isLeader() && divisionInfo.isLeaderReady(),
+                // reaches max retry timeout
+                () -> System.currentTimeMillis() - startTime >= 
DEFAULT_WAIT_LEADER_READY_TIMEOUT);
+
     try {
-      while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
-        Thread.sleep(10);
-        long consumedTime = System.currentTimeMillis() - startTime;
-        if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) {
-          logger.warn("{}: leader is still not ready after {}ms", groupId, 
consumedTime);
-          return false;
-        }
+      Retriable.attemptUntilTrue(
+          noRetryAtAnyOfFollowingCondition,
+          TimeDuration.valueOf(10, TimeUnit.MILLISECONDS),
+          "waitLeaderReady",
+          logger);
+      if (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
+        logger.warn(
+            "{}: leader is still not ready after {}ms", groupId, 
DEFAULT_WAIT_LEADER_READY_TIMEOUT);
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Retriable.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Retriable.java
new file mode 100644
index 00000000000..05757443804
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Retriable.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis.utils;
+
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public class Retriable {
+  /**
+   * Attempt the given operation {@param supplier}. If the result is not 
expected (as indicated via
+   * {@param shouldRetry}), then retry this operation.
+   *
+   * @param maxAttempts max retry attempts. *-1 indicates for retrying 
indefinitely.*
+   * @param sleepTime sleep time during each retry.
+   * @param name the operation's name.
+   * @param log the logger to print messages.
+   * @throws InterruptedException if the sleep is interrupted.
+   * @throws THROWABLE if the operation throws a pre-defined error.
+   * @return the result of given operation if it executes successfully
+   */
+  public static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+      CheckedSupplier<RETURN, THROWABLE> supplier,
+      Predicate<RETURN> shouldRetry,
+      int maxAttempts,
+      TimeDuration sleepTime,
+      Supplier<?> name,
+      Logger log)
+      throws THROWABLE, InterruptedException {
+    Objects.requireNonNull(supplier, "supplier == null");
+    Objects.requireNonNull(shouldRetry, "shouldRetry == null");
+    Preconditions.assertTrue(maxAttempts == -1 || maxAttempts > 0);
+    Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " + 
sleepTime + " < 0");
+
+    for (int i = 1; /* Forever Loop */ ; i++) {
+      try {
+        final RETURN ret = supplier.get();
+        // if we should retry and the total attempt doesn't reach max allowed 
attempts
+        if (shouldRetry.test(ret) && (maxAttempts == -1 || i <= maxAttempts)) {
+          if (log != null && log.isDebugEnabled()) {
+            log.debug("Failed {}, attempt #{}, sleep {} and then retry", 
name.get(), i, sleepTime);
+          }
+          sleepTime.sleep();
+          continue;
+        }
+        return ret;
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        if (log != null && log.isWarnEnabled()) {
+          log.warn("{}: interrupted when waiting for retry", name.get());
+        }
+        throw e;
+      }
+    }
+  }
+
+  /** Attempt indefinitely until the given {@param condition} holds */
+  public static void attemptUntilTrue(
+      BooleanSupplier condition, TimeDuration sleepTime, String name, Logger 
log)
+      throws InterruptedException {
+    Objects.requireNonNull(condition, "condition == null");
+    attempt(() -> null, ret -> !condition.getAsBoolean(), -1, sleepTime, () -> 
name, log);
+  }
+
+  /**
+   * * Attempt the given operation {@param supplier}. May retry several times 
according to the given
+   * retry policy {@param policy}
+   *
+   * @param name the operation's name.
+   * @param logger the logger to print messages.
+   * @throws InterruptedException if the sleep is interrupted.
+   * @throws THROWABLE if the operation throws a pre-defined error.
+   * @return the result of given operation if it executes successfully
+   */
+  public static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+      CheckedSupplier<RETURN, THROWABLE> supplier,
+      RetryPolicy<RETURN> policy,
+      Supplier<?> name,
+      Logger logger)
+      throws THROWABLE, InterruptedException {
+    return attempt(
+        supplier, policy::shouldRetry, policy.getMaxAttempts(), 
policy.getWaitTime(), name, logger);
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RetryPolicy.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RetryPolicy.java
new file mode 100644
index 00000000000..a8da79d2fae
--- /dev/null
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RetryPolicy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis.utils;
+
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.function.Function;
+
+public class RetryPolicy<RESP> {
+  private final Function<RESP, Boolean> retryHandler;
+  /** -1 means retry indefinitely */
+  private final int maxAttempts;
+
+  private final TimeDuration waitTime;
+
+  public RetryPolicy(Function<RESP, Boolean> retryHandler, int maxAttempts, 
TimeDuration waitTime) {
+    this.retryHandler = retryHandler;
+    this.maxAttempts = maxAttempts;
+    this.waitTime = waitTime;
+  }
+
+  boolean shouldRetry(RESP resp) {
+    return retryHandler.apply(resp);
+  }
+
+  public int getMaxAttempts() {
+    return maxAttempts;
+  }
+
+  public TimeDuration getWaitTime() {
+    return waitTime;
+  }
+
+  public static <RESP> RetryPolicyBuilder<RESP> newBuilder() {
+    return new RetryPolicyBuilder<>();
+  }
+
+  public static class RetryPolicyBuilder<RESP> {
+    private Function<RESP, Boolean> retryHandler = (r) -> false;
+    private int maxAttempts = 0;
+    private TimeDuration waitTime = TimeDuration.ZERO;
+
+    public RetryPolicyBuilder<RESP> setRetryHandler(Function<RESP, Boolean> 
retryHandler) {
+      this.retryHandler = retryHandler;
+      return this;
+    }
+
+    public RetryPolicyBuilder<RESP> setMaxAttempts(int maxAttempts) {
+      this.maxAttempts = maxAttempts;
+      return this;
+    }
+
+    public RetryPolicyBuilder<RESP> setWaitTime(TimeDuration waitTime) {
+      this.waitTime = waitTime;
+      return this;
+    }
+
+    public RetryPolicy<RESP> build() {
+      return new RetryPolicy<>(retryHandler, maxAttempts, waitTime);
+    }
+  }
+}
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index fa841629090..5390420b304 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -48,6 +48,7 @@ import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
 public class Utils {
@@ -319,4 +320,13 @@ public class Utils {
     final TimeDuration clientMaxRetryGap = 
getMaxRetrySleepTime(config.getClient());
     RaftServerConfigKeys.RetryCache.setExpiryTime(properties, 
clientMaxRetryGap);
   }
+
+  public static boolean anyOf(BooleanSupplier... conditions) {
+    for (BooleanSupplier condition : conditions) {
+      if (condition.getAsBoolean()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
index 93a972204c2..55d12730ffa 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
@@ -198,8 +198,7 @@ public class RecoverReadTest {
     // restart the cluster
     miniCluster.restart();
 
-    // query during redo: get exception that ratis is under recovery
-    Assert.assertThrows(RatisUnderRecoveryException.class, () -> 
miniCluster.readThrough(0));
+    Assert.assertEquals(10, miniCluster.mustRead(0));
   }
 
   @Test

Reply via email to