This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira6116 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 35dadda081e7fa27308a1ecd7def9889d0715c49 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Aug 16 16:22:30 2023 +0800 finish Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../apache/iotdb/consensus/iot/IoTConsensus.java | 17 +++++++ .../consensus/iot/IoTConsensusServerImpl.java | 8 ++++ .../consensus/iot/client/DispatchLogHandler.java | 52 +++++++++++----------- .../consensus/iot/logdispatcher/LogDispatcher.java | 4 ++ .../iotdb/consensus/ratis/RatisConsensus.java | 7 +-- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +- .../iotdb/commons/concurrent/ThreadName.java | 4 +- 7 files changed, 58 insertions(+), 36 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 047d557ad55..3afc4590c7b 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -22,6 +22,8 @@ package org.apache.iotdb.consensus.iot; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; @@ -64,6 +66,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; public class IoTConsensus implements IConsensus { @@ -81,6 +85,7 @@ public class IoTConsensus implements IConsensus { private final IoTConsensusConfig config; private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager; private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager; + private final ScheduledExecutorService retryService; public IoTConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNodeEndPoint(); @@ -97,6 +102,9 @@ public class IoTConsensus implements IConsensus { new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>() .createClientManager( new SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig())); + this.retryService = + IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( + ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName()); // init IoTConsensus memory manager IoTConsensusMemoryManager.getInstance() .init( @@ -133,6 +141,7 @@ public class IoTConsensus implements IConsensus { new Peer(consensusGroupId, thisNodeId, thisNode), new ArrayList<>(), registry.apply(consensusGroupId), + retryService, clientManager, syncClientManager, config); @@ -149,6 +158,13 @@ public class IoTConsensus implements IConsensus { clientManager.close(); syncClientManager.close(); registerManager.deregisterAll(); + retryService.shutdown(); + try { + retryService.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e); + Thread.currentThread().interrupt(); + } } @Override @@ -214,6 +230,7 @@ public class IoTConsensus implements IConsensus { new Peer(groupId, thisNodeId, thisNode), peers, registry.apply(groupId), + retryService, clientManager, syncClientManager, config); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 0b76a690dba..92f3584705f 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -80,6 +80,7 @@ import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -112,12 +113,14 @@ public class IoTConsensusServerImpl { private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager; private final IoTConsensusServerMetrics ioTConsensusServerMetrics; private final String consensusGroupId; + private final ScheduledExecutorService retryService; public IoTConsensusServerImpl( String storageDir, Peer thisNode, List<Peer> configuration, IStateMachine stateMachine, + ScheduledExecutorService retryService, IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager, IClientManager<TEndPoint, SyncIoTConsensusServiceClient> syncClientManager, IoTConsensusConfig config) { @@ -133,6 +136,7 @@ public class IoTConsensusServerImpl { } else { persistConfiguration(); } + this.retryService = retryService; this.config = config; this.consensusGroupId = thisNode.getGroupId().toString(); consensusReqReader = (ConsensusReqReader) stateMachine.read(new GetConsensusReqReaderPlan()); @@ -732,6 +736,10 @@ public class IoTConsensusServerImpl { return searchIndex; } + public ScheduledExecutorService getRetryService() { + return retryService; + } + public boolean isReadOnly() { return stateMachine.isReadOnly(); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java index 94ba349c6d6..f69ea0c2d76 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java @@ -29,7 +29,7 @@ import org.apache.thrift.async.AsyncMethodCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRes> { @@ -88,31 +88,29 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogEntriesRe } private void sleepCorrespondingTimeAndRetryAsynchronous() { - // TODO handle forever retry - CompletableFuture.runAsync( - () -> { - try { - long defaultSleepTime = - (long) - (thread.getConfig().getReplication().getBasicRetryWaitTimeMs() - * Math.pow(2, retryCount)); - Thread.sleep( - Math.min( - defaultSleepTime, thread.getConfig().getReplication().getMaxRetryWaitTimeMs())); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - logger.warn("Unexpected interruption during retry pending batch"); - } - if (thread.isStopped()) { - logger.debug( - "LogDispatcherThread {} has been stopped, " - + "we will not retrying this Batch {} after {} times", - thread.getPeer(), - batch, - retryCount); - } else { - thread.sendBatchAsync(batch, this); - } - }); + long sleepTime = + Math.min( + (long) + (thread.getConfig().getReplication().getBasicRetryWaitTimeMs() + * Math.pow(2, retryCount)), + thread.getConfig().getReplication().getMaxRetryWaitTimeMs()); + thread + .getImpl() + .getRetryService() + .schedule( + () -> { + if (thread.isStopped()) { + logger.debug( + "LogDispatcherThread {} has been stopped, " + + "we will not retrying this Batch {} after {} times", + thread.getPeer(), + batch, + retryCount); + } else { + thread.sendBatchAsync(batch, this); + } + }, + sleepTime, + TimeUnit.MILLISECONDS); } } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index ef88d5a8163..30a2cc2d22c 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -299,6 +299,10 @@ public class LogDispatcher { return stopped; } + public IoTConsensusServerImpl getImpl() { + return impl; + } + @Override public void run() { logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java index 27c8bb0d271..f6819b18097 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java @@ -90,7 +90,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -123,7 +122,6 @@ class RatisConsensus implements IConsensus { /** TODO make it configurable */ private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) TimeUnit.SECONDS.toMillis(20); - private final ExecutorService addExecutor; private final ScheduledExecutorService diskGuardian; private final long triggerSnapshotThreshold; @@ -153,7 +151,6 @@ class RatisConsensus implements IConsensus { this.ratisMetricSet = new RatisMetricSet(); this.triggerSnapshotThreshold = this.config.getImpl().getTriggerSnapshotFileSize(); - addExecutor = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName()); diskGuardian = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor( ThreadName.RATIS_BG_DISK_GUARDIAN.getName()); @@ -186,10 +183,8 @@ class RatisConsensus implements IConsensus { @Override public void stop() throws IOException { - addExecutor.shutdown(); diskGuardian.shutdown(); try { - addExecutor.awaitTermination(5, TimeUnit.SECONDS); diskGuardian.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.warn("{}: interrupted when shutting down add Executor with exception {}", this, e); @@ -197,8 +192,8 @@ class RatisConsensus implements IConsensus { } finally { clientManager.close(); server.close(); + MetricService.getInstance().removeMetricSet(this.ratisMetricSet); } - MetricService.getInstance().removeMetricSet(this.ratisMetricSet); } private boolean shouldRetry(RaftClientReply reply) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index f05c19bfa48..c85043f011c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1071,7 +1071,7 @@ public class IoTDBConfig { // IoTConsensus Config private int maxLogEntriesNumPerBatch = 1024; private int maxSizePerBatch = 16 * 1024 * 1024; - private int maxPendingBatchesNum = 12; + private int maxPendingBatchesNum = 5; private double maxMemoryRatioForQueue = 0.6; /** Pipe related */ diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java index 38968b66659..73dc94de15c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java @@ -103,13 +103,14 @@ public enum ThreadName { IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"), ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"), LOG_DISPATCHER("LogDispatcher"), + + LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"), // -------------------------- Ratis -------------------------- // NOTICE: The thread name of ratis cannot be edited here! // We list the thread name here just for distinguishing what module the thread belongs to. RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"), RAFT_SERVER_EXECUTOR("\\d+-server-thread"), RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"), - RATIS_ADD("Ratis-Add"), SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"), STATE_MACHINE_UPDATER("StateMachineUpdater"), FOLLOWER_STATE("FollowerState"), @@ -243,7 +244,6 @@ public enum ThreadName { RAFT_SERVER_PROXY_EXECUTOR, RAFT_SERVER_EXECUTOR, RAFT_SERVER_CLIENT_EXECUTOR, - RATIS_ADD, SEGMENT_RAFT_WORKER, STATE_MACHINE_UPDATER, FOLLOWER_STATE,
