This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/1.2 by this push:
new 1d13886a01a [To rel/1.2][IOTDB-6116] Disassociate the IoTConsensus
retry logic from the forkjoinPool (#10872) (#10878)
1d13886a01a is described below
commit 1d13886a01a3cf84cd9d034446b9dff7131488ac
Author: Potato <[email protected]>
AuthorDate: Thu Aug 17 10:12:59 2023 +0800
[To rel/1.2][IOTDB-6116] Disassociate the IoTConsensus retry logic from the
forkjoinPool (#10872) (#10878)
Signed-off-by: OneSizeFitQuorum <[email protected]>
---
.../apache/iotdb/consensus/iot/IoTConsensus.java | 17 +++++++
.../consensus/iot/IoTConsensusServerImpl.java | 8 ++++
.../consensus/iot/client/DispatchLogHandler.java | 52 +++++++++++-----------
.../consensus/iot/logdispatcher/LogDispatcher.java | 4 ++
.../iotdb/consensus/ratis/RatisConsensus.java | 7 +--
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/commons/concurrent/ThreadName.java | 6 +--
7 files changed, 59 insertions(+), 37 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 047d557ad55..3afc4590c7b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.consensus.iot;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.RegisterManager;
@@ -64,6 +66,8 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class IoTConsensus implements IConsensus {
@@ -81,6 +85,7 @@ public class IoTConsensus implements IConsensus {
private final IoTConsensusConfig config;
private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient>
clientManager;
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient>
syncClientManager;
+ private final ScheduledExecutorService retryService;
public IoTConsensus(ConsensusConfig config, Registry registry) {
this.thisNode = config.getThisNodeEndPoint();
@@ -97,6 +102,9 @@ public class IoTConsensus implements IConsensus {
new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
.createClientManager(
new
SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
+ this.retryService =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
// init IoTConsensus memory manager
IoTConsensusMemoryManager.getInstance()
.init(
@@ -133,6 +141,7 @@ public class IoTConsensus implements IConsensus {
new Peer(consensusGroupId, thisNodeId, thisNode),
new ArrayList<>(),
registry.apply(consensusGroupId),
+ retryService,
clientManager,
syncClientManager,
config);
@@ -149,6 +158,13 @@ public class IoTConsensus implements IConsensus {
clientManager.close();
syncClientManager.close();
registerManager.deregisterAll();
+ retryService.shutdown();
+ try {
+ retryService.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("{}: interrupted when shutting down add Executor with
exception {}", this, e);
+ Thread.currentThread().interrupt();
+ }
}
@Override
@@ -214,6 +230,7 @@ public class IoTConsensus implements IConsensus {
new Peer(groupId, thisNodeId, thisNode),
peers,
registry.apply(groupId),
+ retryService,
clientManager,
syncClientManager,
config);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 0b76a690dba..92f3584705f 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -80,6 +80,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -112,12 +113,14 @@ public class IoTConsensusServerImpl {
private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient>
syncClientManager;
private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
private final String consensusGroupId;
+ private final ScheduledExecutorService retryService;
public IoTConsensusServerImpl(
String storageDir,
Peer thisNode,
List<Peer> configuration,
IStateMachine stateMachine,
+ ScheduledExecutorService retryService,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
IClientManager<TEndPoint, SyncIoTConsensusServiceClient>
syncClientManager,
IoTConsensusConfig config) {
@@ -133,6 +136,7 @@ public class IoTConsensusServerImpl {
} else {
persistConfiguration();
}
+ this.retryService = retryService;
this.config = config;
this.consensusGroupId = thisNode.getGroupId().toString();
consensusReqReader = (ConsensusReqReader) stateMachine.read(new
GetConsensusReqReaderPlan());
@@ -732,6 +736,10 @@ public class IoTConsensusServerImpl {
return searchIndex;
}
+ public ScheduledExecutorService getRetryService() {
+ return retryService;
+ }
+
public boolean isReadOnly() {
return stateMachine.isReadOnly();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 94ba349c6d6..f69ea0c2d76 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -29,7 +29,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRes> {
@@ -88,31 +88,29 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
}
private void sleepCorrespondingTimeAndRetryAsynchronous() {
- // TODO handle forever retry
- CompletableFuture.runAsync(
- () -> {
- try {
- long defaultSleepTime =
- (long)
-
(thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
- * Math.pow(2, retryCount));
- Thread.sleep(
- Math.min(
- defaultSleepTime,
thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- logger.warn("Unexpected interruption during retry pending batch");
- }
- if (thread.isStopped()) {
- logger.debug(
- "LogDispatcherThread {} has been stopped, "
- + "we will not retrying this Batch {} after {} times",
- thread.getPeer(),
- batch,
- retryCount);
- } else {
- thread.sendBatchAsync(batch, this);
- }
- });
+ long sleepTime =
+ Math.min(
+ (long)
+ (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
+ * Math.pow(2, retryCount)),
+ thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
+ thread
+ .getImpl()
+ .getRetryService()
+ .schedule(
+ () -> {
+ if (thread.isStopped()) {
+ logger.debug(
+ "LogDispatcherThread {} has been stopped, "
+ + "we will not retrying this Batch {} after {} times",
+ thread.getPeer(),
+ batch,
+ retryCount);
+ } else {
+ thread.sendBatchAsync(batch, this);
+ }
+ },
+ sleepTime,
+ TimeUnit.MILLISECONDS);
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index ef88d5a8163..30a2cc2d22c 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -299,6 +299,10 @@ public class LogDispatcher {
return stopped;
}
+ public IoTConsensusServerImpl getImpl() {
+ return impl;
+ }
+
@Override
public void run() {
logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 27c8bb0d271..f6819b18097 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -90,7 +90,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -123,7 +122,6 @@ class RatisConsensus implements IConsensus {
/** TODO make it configurable */
private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int)
TimeUnit.SECONDS.toMillis(20);
- private final ExecutorService addExecutor;
private final ScheduledExecutorService diskGuardian;
private final long triggerSnapshotThreshold;
@@ -153,7 +151,6 @@ class RatisConsensus implements IConsensus {
this.ratisMetricSet = new RatisMetricSet();
this.triggerSnapshotThreshold =
this.config.getImpl().getTriggerSnapshotFileSize();
- addExecutor =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName());
diskGuardian =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
@@ -186,10 +183,8 @@ class RatisConsensus implements IConsensus {
@Override
public void stop() throws IOException {
- addExecutor.shutdown();
diskGuardian.shutdown();
try {
- addExecutor.awaitTermination(5, TimeUnit.SECONDS);
diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.warn("{}: interrupted when shutting down add Executor with
exception {}", this, e);
@@ -197,8 +192,8 @@ class RatisConsensus implements IConsensus {
} finally {
clientManager.close();
server.close();
+ MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
}
- MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
}
private boolean shouldRetry(RaftClientReply reply) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f05c19bfa48..c85043f011c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1071,7 +1071,7 @@ public class IoTDBConfig {
// IoTConsensus Config
private int maxLogEntriesNumPerBatch = 1024;
private int maxSizePerBatch = 16 * 1024 * 1024;
- private int maxPendingBatchesNum = 12;
+ private int maxPendingBatchesNum = 5;
private double maxMemoryRatioForQueue = 0.6;
/** Pipe related */
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 38968b66659..3d26b9e8ee7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,13 +103,13 @@ public enum ThreadName {
IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"),
LOG_DISPATCHER("LogDispatcher"),
+ LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"),
// -------------------------- Ratis --------------------------
// NOTICE: The thread name of ratis cannot be edited here!
// We list the thread name here just for distinguishing what module the
thread belongs to.
RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"),
RAFT_SERVER_EXECUTOR("\\d+-server-thread"),
RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"),
- RATIS_ADD("Ratis-Add"),
SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"),
STATE_MACHINE_UPDATER("StateMachineUpdater"),
FOLLOWER_STATE("FollowerState"),
@@ -235,7 +235,8 @@ public enum ThreadName {
IOT_CONSENSUS_RPC_SERVICE,
IOT_CONSENSUS_RPC_PROCESSOR,
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL,
- LOG_DISPATCHER));
+ LOG_DISPATCHER,
+ LOG_DISPATCHER_RETRY_EXECUTOR));
private static final Set<ThreadName> ratisThreadNames =
new HashSet<>(
@@ -243,7 +244,6 @@ public enum ThreadName {
RAFT_SERVER_PROXY_EXECUTOR,
RAFT_SERVER_EXECUTOR,
RAFT_SERVER_CLIENT_EXECUTOR,
- RATIS_ADD,
SEGMENT_RAFT_WORKER,
STATE_MACHINE_UPDATER,
FOLLOWER_STATE,