This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rel/1.2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.2 by this push:
     new 1d13886a01a [To rel/1.2][IOTDB-6116] Disassociate the IoTConsensus 
retry logic from the forkjoinPool (#10872) (#10878)
1d13886a01a is described below

commit 1d13886a01a3cf84cd9d034446b9dff7131488ac
Author: Potato <[email protected]>
AuthorDate: Thu Aug 17 10:12:59 2023 +0800

    [To rel/1.2][IOTDB-6116] Disassociate the IoTConsensus retry logic from the 
forkjoinPool (#10872) (#10878)
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 17 +++++++
 .../consensus/iot/IoTConsensusServerImpl.java      |  8 ++++
 .../consensus/iot/client/DispatchLogHandler.java   | 52 +++++++++++-----------
 .../consensus/iot/logdispatcher/LogDispatcher.java |  4 ++
 .../iotdb/consensus/ratis/RatisConsensus.java      |  7 +--
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/commons/concurrent/ThreadName.java       |  6 +--
 7 files changed, 59 insertions(+), 37 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 047d557ad55..3afc4590c7b 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.consensus.iot;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
@@ -64,6 +66,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class IoTConsensus implements IConsensus {
@@ -81,6 +85,7 @@ public class IoTConsensus implements IConsensus {
   private final IoTConsensusConfig config;
   private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> 
clientManager;
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
+  private final ScheduledExecutorService retryService;
 
   public IoTConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -97,6 +102,9 @@ public class IoTConsensus implements IConsensus {
         new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
             .createClientManager(
                 new 
SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
+    this.retryService =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
     // init IoTConsensus memory manager
     IoTConsensusMemoryManager.getInstance()
         .init(
@@ -133,6 +141,7 @@ public class IoTConsensus implements IConsensus {
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   new ArrayList<>(),
                   registry.apply(consensusGroupId),
+                  retryService,
                   clientManager,
                   syncClientManager,
                   config);
@@ -149,6 +158,13 @@ public class IoTConsensus implements IConsensus {
     clientManager.close();
     syncClientManager.close();
     registerManager.deregisterAll();
+    retryService.shutdown();
+    try {
+      retryService.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (InterruptedException e) {
+      logger.warn("{}: interrupted when shutting down add Executor with 
exception {}", this, e);
+      Thread.currentThread().interrupt();
+    }
   }
 
   @Override
@@ -214,6 +230,7 @@ public class IoTConsensus implements IConsensus {
                   new Peer(groupId, thisNodeId, thisNode),
                   peers,
                   registry.apply(groupId),
+                  retryService,
                   clientManager,
                   syncClientManager,
                   config);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 0b76a690dba..92f3584705f 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -80,6 +80,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -112,12 +113,14 @@ public class IoTConsensusServerImpl {
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
   private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
   private final String consensusGroupId;
+  private final ScheduledExecutorService retryService;
 
   public IoTConsensusServerImpl(
       String storageDir,
       Peer thisNode,
       List<Peer> configuration,
       IStateMachine stateMachine,
+      ScheduledExecutorService retryService,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
       IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager,
       IoTConsensusConfig config) {
@@ -133,6 +136,7 @@ public class IoTConsensusServerImpl {
     } else {
       persistConfiguration();
     }
+    this.retryService = retryService;
     this.config = config;
     this.consensusGroupId = thisNode.getGroupId().toString();
     consensusReqReader = (ConsensusReqReader) stateMachine.read(new 
GetConsensusReqReaderPlan());
@@ -732,6 +736,10 @@ public class IoTConsensusServerImpl {
     return searchIndex;
   }
 
+  public ScheduledExecutorService getRetryService() {
+    return retryService;
+  }
+
   public boolean isReadOnly() {
     return stateMachine.isReadOnly();
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 94ba349c6d6..f69ea0c2d76 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -29,7 +29,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRes> {
 
@@ -88,31 +88,29 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
   }
 
   private void sleepCorrespondingTimeAndRetryAsynchronous() {
-    // TODO handle forever retry
-    CompletableFuture.runAsync(
-        () -> {
-          try {
-            long defaultSleepTime =
-                (long)
-                    
(thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
-                        * Math.pow(2, retryCount));
-            Thread.sleep(
-                Math.min(
-                    defaultSleepTime, 
thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            logger.warn("Unexpected interruption during retry pending batch");
-          }
-          if (thread.isStopped()) {
-            logger.debug(
-                "LogDispatcherThread {} has been stopped, "
-                    + "we will not retrying this Batch {} after {} times",
-                thread.getPeer(),
-                batch,
-                retryCount);
-          } else {
-            thread.sendBatchAsync(batch, this);
-          }
-        });
+    long sleepTime =
+        Math.min(
+            (long)
+                (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
+                    * Math.pow(2, retryCount)),
+            thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
+    thread
+        .getImpl()
+        .getRetryService()
+        .schedule(
+            () -> {
+              if (thread.isStopped()) {
+                logger.debug(
+                    "LogDispatcherThread {} has been stopped, "
+                        + "we will not retrying this Batch {} after {} times",
+                    thread.getPeer(),
+                    batch,
+                    retryCount);
+              } else {
+                thread.sendBatchAsync(batch, this);
+              }
+            },
+            sleepTime,
+            TimeUnit.MILLISECONDS);
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index ef88d5a8163..30a2cc2d22c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -299,6 +299,10 @@ public class LogDispatcher {
       return stopped;
     }
 
+    public IoTConsensusServerImpl getImpl() {
+      return impl;
+    }
+
     @Override
     public void run() {
       logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index 27c8bb0d271..f6819b18097 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -90,7 +90,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -123,7 +122,6 @@ class RatisConsensus implements IConsensus {
   /** TODO make it configurable */
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
-  private final ExecutorService addExecutor;
   private final ScheduledExecutorService diskGuardian;
   private final long triggerSnapshotThreshold;
 
@@ -153,7 +151,6 @@ class RatisConsensus implements IConsensus {
     this.ratisMetricSet = new RatisMetricSet();
 
     this.triggerSnapshotThreshold = 
this.config.getImpl().getTriggerSnapshotFileSize();
-    addExecutor = 
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName());
     diskGuardian =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
             ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
@@ -186,10 +183,8 @@ class RatisConsensus implements IConsensus {
 
   @Override
   public void stop() throws IOException {
-    addExecutor.shutdown();
     diskGuardian.shutdown();
     try {
-      addExecutor.awaitTermination(5, TimeUnit.SECONDS);
       diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       logger.warn("{}: interrupted when shutting down add Executor with 
exception {}", this, e);
@@ -197,8 +192,8 @@ class RatisConsensus implements IConsensus {
     } finally {
       clientManager.close();
       server.close();
+      MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
     }
-    MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
   }
 
   private boolean shouldRetry(RaftClientReply reply) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index f05c19bfa48..c85043f011c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1071,7 +1071,7 @@ public class IoTDBConfig {
   // IoTConsensus Config
   private int maxLogEntriesNumPerBatch = 1024;
   private int maxSizePerBatch = 16 * 1024 * 1024;
-  private int maxPendingBatchesNum = 12;
+  private int maxPendingBatchesNum = 5;
   private double maxMemoryRatioForQueue = 0.6;
 
   /** Pipe related */
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 38968b66659..3d26b9e8ee7 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,13 +103,13 @@ public enum ThreadName {
   IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
   
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"),
   LOG_DISPATCHER("LogDispatcher"),
+  LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"),
   // -------------------------- Ratis --------------------------
   // NOTICE: The thread name of ratis cannot be edited here!
   // We list the thread name here just for distinguishing what module the 
thread belongs to.
   RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"),
   RAFT_SERVER_EXECUTOR("\\d+-server-thread"),
   RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"),
-  RATIS_ADD("Ratis-Add"),
   SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"),
   STATE_MACHINE_UPDATER("StateMachineUpdater"),
   FOLLOWER_STATE("FollowerState"),
@@ -235,7 +235,8 @@ public enum ThreadName {
               IOT_CONSENSUS_RPC_SERVICE,
               IOT_CONSENSUS_RPC_PROCESSOR,
               ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL,
-              LOG_DISPATCHER));
+              LOG_DISPATCHER,
+              LOG_DISPATCHER_RETRY_EXECUTOR));
 
   private static final Set<ThreadName> ratisThreadNames =
       new HashSet<>(
@@ -243,7 +244,6 @@ public enum ThreadName {
               RAFT_SERVER_PROXY_EXECUTOR,
               RAFT_SERVER_EXECUTOR,
               RAFT_SERVER_CLIENT_EXECUTOR,
-              RATIS_ADD,
               SEGMENT_RAFT_WORKER,
               STATE_MACHINE_UPDATER,
               FOLLOWER_STATE,

Reply via email to