This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8901076ec17 [IOTDB-6227] Add retry for RatisConsensus read (#11458)
8901076ec17 is described below
commit 8901076ec17f2a54c393567e1fcd51f099a176f8
Author: William Song <[email protected]>
AuthorDate: Fri Nov 10 11:10:44 2023 +0800
[IOTDB-6227] Add retry for RatisConsensus read (#11458)
---
.../ratis/ApplicationStateMachineProxy.java | 16 +--
.../iotdb/consensus/ratis/RatisConsensus.java | 91 +++++++++++-------
.../iotdb/consensus/ratis/utils/Retriable.java | 107 +++++++++++++++++++++
.../iotdb/consensus/ratis/utils/RetryPolicy.java | 79 +++++++++++++++
.../apache/iotdb/consensus/ratis/utils/Utils.java | 10 ++
.../iotdb/consensus/ratis/RecoverReadTest.java | 3 +-
6 files changed, 262 insertions(+), 44 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
index fda00ab8504..1460398ab05 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.Retriable;
import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -46,6 +47,7 @@ import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +58,6 @@ import java.nio.file.StandardCopyOption;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class ApplicationStateMachineProxy extends BaseStateMachine {
@@ -182,13 +183,12 @@ public class ApplicationStateMachineProxy extends
BaseStateMachine {
}
private void waitUntilSystemAllowApply() {
- while (Utils.stallApply()) {
- try {
- TimeUnit.SECONDS.sleep(60);
- } catch (InterruptedException e) {
- logger.warn("{}: interrupted when waiting until system ready: ", this,
e);
- Thread.currentThread().interrupt();
- }
+ try {
+ Retriable.attemptUntilTrue(
+ () -> !Utils.stallApply(), TimeDuration.ONE_MINUTE,
"waitUntilSystemAllowApply", logger);
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted when waiting until system ready: ", this,
e);
+ Thread.currentThread().interrupt();
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index a4bbf19cb8a..a4aece95010 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -48,6 +48,8 @@ import
org.apache.iotdb.consensus.exception.RatisRequestFailedException;
import org.apache.iotdb.consensus.exception.RatisUnderRecoveryException;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricSet;
import org.apache.iotdb.consensus.ratis.metrics.RatisMetricsManager;
+import org.apache.iotdb.consensus.ratis.utils.Retriable;
+import org.apache.iotdb.consensus.ratis.utils.RetryPolicy;
import org.apache.iotdb.consensus.ratis.utils.Utils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -94,6 +96,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
/** A multi-raft consensus implementation based on Apache Ratis. */
@@ -124,6 +127,8 @@ class RatisConsensus implements IConsensus {
private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int)
TimeUnit.SECONDS.toMillis(20);
private final RatisConfig config;
+ private final RetryPolicy<RaftClientReply> readRetryPolicy;
+ private final RetryPolicy<RaftClientReply> writeRetryPolicy;
private final RatisMetricSet ratisMetricSet;
private final TConsensusGroupType consensusGroupType;
@@ -145,6 +150,26 @@ class RatisConsensus implements IConsensus {
this.config = config.getRatisConfig();
this.consensusGroupType = config.getConsensusGroupType();
this.ratisMetricSet = new RatisMetricSet();
+ this.readRetryPolicy =
+ RetryPolicy.<RaftClientReply>newBuilder()
+ .setRetryHandler(c -> !c.isSuccess() && c.getException()
instanceof ReadIndexException)
+ .setMaxAttempts(this.config.getImpl().getRetryTimesMax())
+ .setWaitTime(
+ TimeDuration.valueOf(
+ this.config.getImpl().getRetryWaitMillis(),
TimeUnit.MILLISECONDS))
+ .build();
+ this.writeRetryPolicy =
+ RetryPolicy.<RaftClientReply>newBuilder()
+ // currently, we only retry when ResourceUnavailableException is
caught
+ .setRetryHandler(
+ reply ->
+ !reply.isSuccess()
+ && (reply.getException() instanceof
ResourceUnavailableException))
+ .setMaxAttempts(this.config.getImpl().getRetryTimesMax())
+ .setWaitTime(
+ TimeDuration.valueOf(
+ this.config.getImpl().getRetryWaitMillis(),
TimeUnit.MILLISECONDS))
+ .build();
this.diskGuardian = new DiskGuardian(() -> this, this.config);
@@ -188,36 +213,17 @@ class RatisConsensus implements IConsensus {
}
}
- private boolean shouldRetry(RaftClientReply reply) {
- // currently, we only retry when ResourceUnavailableException is caught
- return !reply.isSuccess() && (reply.getException() instanceof
ResourceUnavailableException);
- }
-
/** launch a consensus write with retry mechanism */
private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply,
IOException> caller)
throws IOException {
-
- final int maxRetryTimes = config.getImpl().getRetryTimesMax();
- final long waitMillis = config.getImpl().getRetryWaitMillis();
-
- int retry = 0;
RaftClientReply reply = null;
- while (retry < maxRetryTimes) {
- retry++;
-
- reply = caller.get();
- if (!shouldRetry(reply)) {
- return reply;
- }
- logger.debug("{} sending write request with retry = {} and reply = {}",
this, retry, reply);
-
- try {
- Thread.sleep(waitMillis);
- } catch (InterruptedException e) {
- logger.warn("{} retry write sleep is interrupted: {}", this, e);
- Thread.currentThread().interrupt();
- }
+ try {
+ reply = Retriable.attempt(caller, writeRetryPolicy, () -> caller,
logger);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.debug("{}: interrupted when retrying for write request {}", this,
caller);
}
+
if (reply == null) {
return RaftClientReply.newBuilder()
.setSuccess(false)
@@ -360,7 +366,12 @@ class RatisConsensus implements IConsensus {
RaftClientReply reply;
try (AutoCloseable ignored =
RatisMetricsManager.getInstance().startReadTimer(consensusGroupType)) {
- reply = server.submitClientRequest(request);
+ reply =
+ Retriable.attempt(
+ () -> server.submitClientRequest(request),
+ readRetryPolicy,
+ () -> readRequest,
+ logger);
}
// rethrow the exception if the reply is not successful
@@ -590,15 +601,27 @@ class RatisConsensus implements IConsensus {
logger.info("isLeaderReady checking failed with exception: ", e);
return false;
}
- long startTime = System.currentTimeMillis();
+
+ final long startTime = System.currentTimeMillis();
+ final BooleanSupplier noRetryAtAnyOfFollowingCondition =
+ () ->
+ Utils.anyOf(
+ // this peer is not a leader
+ () -> !divisionInfo.isLeader(),
+ // this peer is a ready leader
+ () -> divisionInfo.isLeader() && divisionInfo.isLeaderReady(),
+ // reaches max retry timeout
+ () -> System.currentTimeMillis() - startTime >=
DEFAULT_WAIT_LEADER_READY_TIMEOUT);
+
try {
- while (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
- Thread.sleep(10);
- long consumedTime = System.currentTimeMillis() - startTime;
- if (consumedTime >= DEFAULT_WAIT_LEADER_READY_TIMEOUT) {
- logger.warn("{}: leader is still not ready after {}ms", groupId,
consumedTime);
- return false;
- }
+ Retriable.attemptUntilTrue(
+ noRetryAtAnyOfFollowingCondition,
+ TimeDuration.valueOf(10, TimeUnit.MILLISECONDS),
+ "waitLeaderReady",
+ logger);
+ if (divisionInfo.isLeader() && !divisionInfo.isLeaderReady()) {
+ logger.warn(
+ "{}: leader is still not ready after {}ms", groupId,
DEFAULT_WAIT_LEADER_READY_TIMEOUT);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Retriable.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Retriable.java
new file mode 100644
index 00000000000..05757443804
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Retriable.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis.utils;
+
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.function.CheckedSupplier;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.function.BooleanSupplier;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public class Retriable {
+ /**
+ * Attempt the given operation {@param supplier}. If the result is not
expected (as indicated via
+ * {@param shouldRetry}), then retry this operation.
+ *
+ * @param maxAttempts max retry attempts. *-1 indicates for retrying
indefinitely.*
+ * @param sleepTime sleep time during each retry.
+ * @param name the operation's name.
+ * @param log the logger to print messages.
+ * @throws InterruptedException if the sleep is interrupted.
+ * @throws THROWABLE if the operation throws a pre-defined error.
+ * @return the result of given operation if it executes successfully
+ */
+ public static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+ CheckedSupplier<RETURN, THROWABLE> supplier,
+ Predicate<RETURN> shouldRetry,
+ int maxAttempts,
+ TimeDuration sleepTime,
+ Supplier<?> name,
+ Logger log)
+ throws THROWABLE, InterruptedException {
+ Objects.requireNonNull(supplier, "supplier == null");
+ Objects.requireNonNull(shouldRetry, "shouldRetry == null");
+ Preconditions.assertTrue(maxAttempts == -1 || maxAttempts > 0);
+ Preconditions.assertTrue(!sleepTime.isNegative(), () -> "sleepTime = " +
sleepTime + " < 0");
+
+ for (int i = 1; /* Forever Loop */ ; i++) {
+ try {
+ final RETURN ret = supplier.get();
+ // if we should retry and the total attempt doesn't reach max allowed
attempts
+ if (shouldRetry.test(ret) && (maxAttempts == -1 || i <= maxAttempts)) {
+ if (log != null && log.isDebugEnabled()) {
+ log.debug("Failed {}, attempt #{}, sleep {} and then retry",
name.get(), i, sleepTime);
+ }
+ sleepTime.sleep();
+ continue;
+ }
+ return ret;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (log != null && log.isWarnEnabled()) {
+ log.warn("{}: interrupted when waiting for retry", name.get());
+ }
+ throw e;
+ }
+ }
+ }
+
+ /** Attempt indefinitely until the given {@param condition} holds */
+ public static void attemptUntilTrue(
+ BooleanSupplier condition, TimeDuration sleepTime, String name, Logger
log)
+ throws InterruptedException {
+ Objects.requireNonNull(condition, "condition == null");
+ attempt(() -> null, ret -> !condition.getAsBoolean(), -1, sleepTime, () ->
name, log);
+ }
+
+ /**
+ * * Attempt the given operation {@param supplier}. May retry several times
according to the given
+ * retry policy {@param policy}
+ *
+ * @param name the operation's name.
+ * @param logger the logger to print messages.
+ * @throws InterruptedException if the sleep is interrupted.
+ * @throws THROWABLE if the operation throws a pre-defined error.
+ * @return the result of given operation if it executes successfully
+ */
+ public static <RETURN, THROWABLE extends Throwable> RETURN attempt(
+ CheckedSupplier<RETURN, THROWABLE> supplier,
+ RetryPolicy<RETURN> policy,
+ Supplier<?> name,
+ Logger logger)
+ throws THROWABLE, InterruptedException {
+ return attempt(
+ supplier, policy::shouldRetry, policy.getMaxAttempts(),
policy.getWaitTime(), name, logger);
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RetryPolicy.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RetryPolicy.java
new file mode 100644
index 00000000000..a8da79d2fae
--- /dev/null
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/RetryPolicy.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis.utils;
+
+import org.apache.ratis.util.TimeDuration;
+
+import java.util.function.Function;
+
+public class RetryPolicy<RESP> {
+ private final Function<RESP, Boolean> retryHandler;
+ /** -1 means retry indefinitely */
+ private final int maxAttempts;
+
+ private final TimeDuration waitTime;
+
+ public RetryPolicy(Function<RESP, Boolean> retryHandler, int maxAttempts,
TimeDuration waitTime) {
+ this.retryHandler = retryHandler;
+ this.maxAttempts = maxAttempts;
+ this.waitTime = waitTime;
+ }
+
+ boolean shouldRetry(RESP resp) {
+ return retryHandler.apply(resp);
+ }
+
+ public int getMaxAttempts() {
+ return maxAttempts;
+ }
+
+ public TimeDuration getWaitTime() {
+ return waitTime;
+ }
+
+ public static <RESP> RetryPolicyBuilder<RESP> newBuilder() {
+ return new RetryPolicyBuilder<>();
+ }
+
+ public static class RetryPolicyBuilder<RESP> {
+ private Function<RESP, Boolean> retryHandler = (r) -> false;
+ private int maxAttempts = 0;
+ private TimeDuration waitTime = TimeDuration.ZERO;
+
+ public RetryPolicyBuilder<RESP> setRetryHandler(Function<RESP, Boolean>
retryHandler) {
+ this.retryHandler = retryHandler;
+ return this;
+ }
+
+ public RetryPolicyBuilder<RESP> setMaxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ public RetryPolicyBuilder<RESP> setWaitTime(TimeDuration waitTime) {
+ this.waitTime = waitTime;
+ return this;
+ }
+
+ public RetryPolicy<RESP> build() {
+ return new RetryPolicy<>(retryHandler, maxAttempts, waitTime);
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
index fa841629090..5390420b304 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/utils/Utils.java
@@ -48,6 +48,7 @@ import java.io.File;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
public class Utils {
@@ -319,4 +320,13 @@ public class Utils {
final TimeDuration clientMaxRetryGap =
getMaxRetrySleepTime(config.getClient());
RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
clientMaxRetryGap);
}
+
+ public static boolean anyOf(BooleanSupplier... conditions) {
+ for (BooleanSupplier condition : conditions) {
+ if (condition.getAsBoolean()) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
index 93a972204c2..55d12730ffa 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RecoverReadTest.java
@@ -198,8 +198,7 @@ public class RecoverReadTest {
// restart the cluster
miniCluster.restart();
- // query during redo: get exception that ratis is under recovery
- Assert.assertThrows(RatisUnderRecoveryException.class, () ->
miniCluster.readThrough(0));
+ Assert.assertEquals(10, miniCluster.mustRead(0));
}
@Test