This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 326fc81d7a [IOTDB-3642] Add retry mechanism when resource unavailable
(#7240)
326fc81d7a is described below
commit 326fc81d7a8c97ce05e297be21d5224b1257e830
Author: William Song <[email protected]>
AuthorDate: Wed Sep 7 08:44:06 2022 +0800
[IOTDB-3642] Add retry mechanism when resource unavailable (#7240)
---
.../apache/iotdb/consensus/config/RatisConfig.java | 59 +++++++++++++++++++++-
.../iotdb/consensus/ratis/RatisConsensus.java | 53 ++++++++++++++++++-
2 files changed, 108 insertions(+), 4 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
index e976137287..877a361c4a 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/RatisConfig.java
@@ -34,6 +34,7 @@ public class RatisConfig {
private final ThreadPool threadPool;
private final Log log;
private final Grpc grpc;
+ private final RatisConsensus ratisConsensus;
private RatisConfig(
Rpc rpc,
@@ -41,13 +42,15 @@ public class RatisConfig {
Snapshot snapshot,
ThreadPool threadPool,
Log log,
- Grpc grpc) {
+ Grpc grpc,
+ RatisConsensus ratisConsensus) {
this.rpc = rpc;
this.leaderElection = leaderElection;
this.snapshot = snapshot;
this.threadPool = threadPool;
this.log = log;
this.grpc = grpc;
+ this.ratisConsensus = ratisConsensus;
}
public Rpc getRpc() {
@@ -74,6 +77,10 @@ public class RatisConfig {
return grpc;
}
+ public RatisConsensus getRatisConsensus() {
+ return ratisConsensus;
+ }
+
public static Builder newBuilder() {
return new Builder();
}
@@ -85,6 +92,7 @@ public class RatisConfig {
private ThreadPool threadPool;
private Log log;
private Grpc grpc;
+ private RatisConsensus ratisConsensus;
public RatisConfig build() {
return new RatisConfig(
@@ -93,7 +101,8 @@ public class RatisConfig {
snapshot != null ? snapshot : Snapshot.newBuilder().build(),
threadPool != null ? threadPool : ThreadPool.newBuilder().build(),
log != null ? log : Log.newBuilder().build(),
- grpc != null ? grpc : Grpc.newBuilder().build());
+ grpc != null ? grpc : Grpc.newBuilder().build(),
+ ratisConsensus != null ? ratisConsensus :
RatisConsensus.newBuilder().build());
}
public Builder setRpc(Rpc rpc) {
@@ -125,6 +134,11 @@ public class RatisConfig {
this.grpc = grpc;
return this;
}
+
+ public Builder setRatisConsensus(RatisConsensus ratisConsensus) {
+ this.ratisConsensus = ratisConsensus;
+ return this;
+ }
}
/** server rpc timeout related */
@@ -692,4 +706,45 @@ public class RatisConfig {
}
}
}
+
+ public static class RatisConsensus {
+ private final int retryTimesMax;
+ private final long retryWaitMillis;
+
+ private RatisConsensus(int retryTimesMax, long retryWaitMillis) {
+ this.retryTimesMax = retryTimesMax;
+ this.retryWaitMillis = retryWaitMillis;
+ }
+
+ public int getRetryTimesMax() {
+ return retryTimesMax;
+ }
+
+ public long getRetryWaitMillis() {
+ return retryWaitMillis;
+ }
+
+ public static RatisConsensus.Builder newBuilder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ private int retryTimesMax = 3;
+ private long retryWaitMillis = 500;
+
+ public RatisConsensus build() {
+ return new RatisConsensus(retryTimesMax, retryWaitMillis);
+ }
+
+ public RatisConsensus.Builder setRetryTimesMax(int retryTimesMax) {
+ this.retryTimesMax = retryTimesMax;
+ return this;
+ }
+
+ public RatisConsensus.Builder setRetryWaitMillis(long retryWaitMillis) {
+ this.retryWaitMillis = retryWaitMillis;
+ return this;
+ }
+ }
+ }
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index fcab610918..1abf6a473a 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -38,6 +38,7 @@ import
org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
import org.apache.iotdb.consensus.config.ConsensusConfig;
+import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
import org.apache.iotdb.consensus.exception.NodeReadOnlyException;
@@ -62,9 +63,11 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SnapshotManagementRequest;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,6 +114,8 @@ class RatisConsensus implements IConsensus {
// TODO make it configurable
private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int)
TimeUnit.SECONDS.toMillis(20);
+ private final RatisConfig config;
+
public RatisConsensus(ConsensusConfig config, IStateMachine.Registry
registry)
throws IOException {
myself = Utils.fromTEndPointAndPriorityToRaftPeer(config.getThisNode(),
DEFAULT_PRIORITY);
@@ -122,6 +127,7 @@ class RatisConsensus implements IConsensus {
GrpcConfigKeys.Server.setPort(properties, config.getThisNode().getPort());
Utils.initRatisConfig(properties, config.getRatisConfig());
+ this.config = config.getRatisConfig();
clientRpc = new GrpcFactory(new
Parameters()).newRaftClientRpc(ClientId.randomId(), properties);
@@ -148,6 +154,49 @@ class RatisConsensus implements IConsensus {
server.close();
}
+ private boolean shouldRetry(RaftClientReply reply) {
+ // currently, we only retry when ResourceUnavailableException is caught
+ return !reply.isSuccess()
+ && (reply.getException() != null
+ && reply.getException() instanceof ResourceUnavailableException);
+ }
+ /** launch a consensus write with retry mechanism */
+ private RaftClientReply writeWithRetry(CheckedSupplier<RaftClientReply,
IOException> caller)
+ throws IOException {
+
+ final int maxRetryTimes = config.getRatisConsensus().getRetryTimesMax();
+ final long waitMillis = config.getRatisConsensus().getRetryWaitMillis();
+
+ int retry = 0;
+ RaftClientReply reply = null;
+ while (retry < maxRetryTimes) {
+ retry++;
+
+ reply = caller.get();
+ if (!shouldRetry(reply)) {
+ return reply;
+ }
+ logger.debug("{} sending write request with retry = {} and reply = {}",
this, retry, reply);
+
+ try {
+ Thread.sleep(waitMillis);
+ } catch (InterruptedException e) {
+ logger.warn("{} retry write sleep is interrupted: {}", this, e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ return reply;
+ }
+
+ private RaftClientReply writeLocallyWithRetry(RaftClientRequest request)
throws IOException {
+ return writeWithRetry(() -> server.submitClientRequest(request));
+ }
+
+ private RaftClientReply writeRemotelyWithRetry(RatisClient client, Message
message)
+ throws IOException {
+ return writeWithRetry(() -> client.getRaftClient().io().send(message));
+ }
+
/**
* write will first send request to local server use method call if local
server is not leader, it
* will use RaftClient to send RPC to read leader
@@ -183,7 +232,7 @@ class RatisConsensus implements IConsensus {
RaftPeer suggestedLeader = null;
if (isLeader(consensusGroupId) && waitUntilLeaderReady(raftGroupId)) {
try {
- localServerReply = server.submitClientRequest(clientRequest);
+ localServerReply = writeLocallyWithRetry(clientRequest);
if (localServerReply.isSuccess()) {
ResponseMessage responseMessage = (ResponseMessage)
localServerReply.getMessage();
TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
@@ -203,7 +252,7 @@ class RatisConsensus implements IConsensus {
RatisClient client = null;
try {
client = getRaftClient(raftGroup);
- RaftClientReply reply = client.getRaftClient().io().send(message);
+ RaftClientReply reply = writeRemotelyWithRetry(client, message);
if (!reply.isSuccess()) {
return failedWrite(new
RatisRequestFailedException(reply.getException()));
}