This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch TryFixExportCSV
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 461b140dc4f708f4a1d516e18dc2673da4db803f
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Aug 22 20:43:26 2023 +0800

    Revert "[IOTDB-6116] Disassociate the IoTConsensus retry logic from the 
forkjoinPool (#10872)"
    
    This reverts commit b445540475a046b657045fa0f76d0b71ab983875.
---
 .../apache/iotdb/consensus/iot/IoTConsensus.java   | 17 -------
 .../consensus/iot/IoTConsensusServerImpl.java      |  8 ----
 .../consensus/iot/client/DispatchLogHandler.java   | 52 +++++++++++-----------
 .../consensus/iot/logdispatcher/LogDispatcher.java |  4 --
 .../iotdb/consensus/ratis/RatisConsensus.java      |  7 ++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/commons/concurrent/ThreadName.java       |  6 +--
 7 files changed, 37 insertions(+), 59 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 3afc4590c7b..047d557ad55 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -22,8 +22,6 @@ package org.apache.iotdb.consensus.iot;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
-import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
-import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
@@ -66,8 +64,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class IoTConsensus implements IConsensus {
@@ -85,7 +81,6 @@ public class IoTConsensus implements IConsensus {
   private final IoTConsensusConfig config;
   private final IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> 
clientManager;
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
-  private final ScheduledExecutorService retryService;
 
   public IoTConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNodeEndPoint();
@@ -102,9 +97,6 @@ public class IoTConsensus implements IConsensus {
         new IClientManager.Factory<TEndPoint, SyncIoTConsensusServiceClient>()
             .createClientManager(
                 new 
SyncIoTConsensusServiceClientPoolFactory(config.getIoTConsensusConfig()));
-    this.retryService =
-        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
-            ThreadName.LOG_DISPATCHER_RETRY_EXECUTOR.getName());
     // init IoTConsensus memory manager
     IoTConsensusMemoryManager.getInstance()
         .init(
@@ -141,7 +133,6 @@ public class IoTConsensus implements IConsensus {
                   new Peer(consensusGroupId, thisNodeId, thisNode),
                   new ArrayList<>(),
                   registry.apply(consensusGroupId),
-                  retryService,
                   clientManager,
                   syncClientManager,
                   config);
@@ -158,13 +149,6 @@ public class IoTConsensus implements IConsensus {
     clientManager.close();
     syncClientManager.close();
     registerManager.deregisterAll();
-    retryService.shutdown();
-    try {
-      retryService.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (InterruptedException e) {
-      logger.warn("{}: interrupted when shutting down add Executor with 
exception {}", this, e);
-      Thread.currentThread().interrupt();
-    }
   }
 
   @Override
@@ -230,7 +214,6 @@ public class IoTConsensus implements IConsensus {
                   new Peer(groupId, thisNodeId, thisNode),
                   peers,
                   registry.apply(groupId),
-                  retryService,
                   clientManager,
                   syncClientManager,
                   config);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index 92f3584705f..0b76a690dba 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -80,7 +80,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Condition;
@@ -113,14 +112,12 @@ public class IoTConsensusServerImpl {
   private final IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager;
   private final IoTConsensusServerMetrics ioTConsensusServerMetrics;
   private final String consensusGroupId;
-  private final ScheduledExecutorService retryService;
 
   public IoTConsensusServerImpl(
       String storageDir,
       Peer thisNode,
       List<Peer> configuration,
       IStateMachine stateMachine,
-      ScheduledExecutorService retryService,
       IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
       IClientManager<TEndPoint, SyncIoTConsensusServiceClient> 
syncClientManager,
       IoTConsensusConfig config) {
@@ -136,7 +133,6 @@ public class IoTConsensusServerImpl {
     } else {
       persistConfiguration();
     }
-    this.retryService = retryService;
     this.config = config;
     this.consensusGroupId = thisNode.getGroupId().toString();
     consensusReqReader = (ConsensusReqReader) stateMachine.read(new 
GetConsensusReqReaderPlan());
@@ -736,10 +732,6 @@ public class IoTConsensusServerImpl {
     return searchIndex;
   }
 
-  public ScheduledExecutorService getRetryService() {
-    return retryService;
-  }
-
   public boolean isReadOnly() {
     return stateMachine.isReadOnly();
   }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index f69ea0c2d76..94ba349c6d6 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -29,7 +29,7 @@ import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 
 public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRes> {
 
@@ -88,29 +88,31 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogEntriesRe
   }
 
   private void sleepCorrespondingTimeAndRetryAsynchronous() {
-    long sleepTime =
-        Math.min(
-            (long)
-                (thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
-                    * Math.pow(2, retryCount)),
-            thread.getConfig().getReplication().getMaxRetryWaitTimeMs());
-    thread
-        .getImpl()
-        .getRetryService()
-        .schedule(
-            () -> {
-              if (thread.isStopped()) {
-                logger.debug(
-                    "LogDispatcherThread {} has been stopped, "
-                        + "we will not retrying this Batch {} after {} times",
-                    thread.getPeer(),
-                    batch,
-                    retryCount);
-              } else {
-                thread.sendBatchAsync(batch, this);
-              }
-            },
-            sleepTime,
-            TimeUnit.MILLISECONDS);
+    // TODO handle forever retry
+    CompletableFuture.runAsync(
+        () -> {
+          try {
+            long defaultSleepTime =
+                (long)
+                    
(thread.getConfig().getReplication().getBasicRetryWaitTimeMs()
+                        * Math.pow(2, retryCount));
+            Thread.sleep(
+                Math.min(
+                    defaultSleepTime, 
thread.getConfig().getReplication().getMaxRetryWaitTimeMs()));
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            logger.warn("Unexpected interruption during retry pending batch");
+          }
+          if (thread.isStopped()) {
+            logger.debug(
+                "LogDispatcherThread {} has been stopped, "
+                    + "we will not retrying this Batch {} after {} times",
+                thread.getPeer(),
+                batch,
+                retryCount);
+          } else {
+            thread.sendBatchAsync(batch, this);
+          }
+        });
   }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 30a2cc2d22c..ef88d5a8163 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -299,10 +299,6 @@ public class LogDispatcher {
       return stopped;
     }
 
-    public IoTConsensusServerImpl getImpl() {
-      return impl;
-    }
-
     @Override
     public void run() {
       logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index d73964c7002..055db6c0bb1 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -93,6 +93,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -125,6 +126,7 @@ class RatisConsensus implements IConsensus {
   /** TODO make it configurable */
   private static final int DEFAULT_WAIT_LEADER_READY_TIMEOUT = (int) 
TimeUnit.SECONDS.toMillis(20);
 
+  private final ExecutorService addExecutor;
   private final ScheduledExecutorService diskGuardian;
   private final long triggerSnapshotThreshold;
 
@@ -154,6 +156,7 @@ class RatisConsensus implements IConsensus {
     this.ratisMetricSet = new RatisMetricSet();
 
     this.triggerSnapshotThreshold = 
this.config.getImpl().getTriggerSnapshotFileSize();
+    addExecutor = 
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.RATIS_ADD.getName());
     diskGuardian =
         IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
             ThreadName.RATIS_BG_DISK_GUARDIAN.getName());
@@ -186,8 +189,10 @@ class RatisConsensus implements IConsensus {
 
   @Override
   public void stop() throws IOException {
+    addExecutor.shutdown();
     diskGuardian.shutdown();
     try {
+      addExecutor.awaitTermination(5, TimeUnit.SECONDS);
       diskGuardian.awaitTermination(5, TimeUnit.SECONDS);
     } catch (InterruptedException e) {
       logger.warn("{}: interrupted when shutting down add Executor with 
exception {}", this, e);
@@ -195,8 +200,8 @@ class RatisConsensus implements IConsensus {
     } finally {
       clientManager.close();
       server.close();
-      MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
     }
+    MetricService.getInstance().removeMetricSet(this.ratisMetricSet);
   }
 
   private boolean shouldRetry(RaftClientReply reply) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 7ffd5a97d5d..c7d0dedaf53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1070,7 +1070,7 @@ public class IoTDBConfig {
   // IoTConsensus Config
   private int maxLogEntriesNumPerBatch = 1024;
   private int maxSizePerBatch = 16 * 1024 * 1024;
-  private int maxPendingBatchesNum = 5;
+  private int maxPendingBatchesNum = 12;
   private double maxMemoryRatioForQueue = 0.6;
 
   /** Pipe related */
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index ebc7a2ef511..78426603fcb 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -103,13 +103,13 @@ public enum ThreadName {
   IOT_CONSENSUS_RPC_PROCESSOR("IoTConsensusRPC-Processor"),
   
ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL("AsyncDataNodeIoTConsensusServiceClientPool"),
   LOG_DISPATCHER("LogDispatcher"),
-  LOG_DISPATCHER_RETRY_EXECUTOR("LogDispatcherRetryExecutor"),
   // -------------------------- Ratis --------------------------
   // NOTICE: The thread name of ratis cannot be edited here!
   // We list the thread name here just for distinguishing what module the 
thread belongs to.
   RAFT_SERVER_PROXY_EXECUTOR("\\d+-impl-thread"),
   RAFT_SERVER_EXECUTOR("\\d+-server-thread"),
   RAFT_SERVER_CLIENT_EXECUTOR("\\d+-client-thread"),
+  RATIS_ADD("Ratis-Add"),
   SEGMENT_RAFT_WORKER("SegmentedRaftLogWorker"),
   STATE_MACHINE_UPDATER("StateMachineUpdater"),
   FOLLOWER_STATE("FollowerState"),
@@ -236,8 +236,7 @@ public enum ThreadName {
               IOT_CONSENSUS_RPC_SERVICE,
               IOT_CONSENSUS_RPC_PROCESSOR,
               ASYNC_DATANODE_IOT_CONSENSUS_CLIENT_POOL,
-              LOG_DISPATCHER,
-              LOG_DISPATCHER_RETRY_EXECUTOR));
+              LOG_DISPATCHER));
 
   private static final Set<ThreadName> ratisThreadNames =
       new HashSet<>(
@@ -245,6 +244,7 @@ public enum ThreadName {
               RAFT_SERVER_PROXY_EXECUTOR,
               RAFT_SERVER_EXECUTOR,
               RAFT_SERVER_CLIENT_EXECUTOR,
+              RATIS_ADD,
               SEGMENT_RAFT_WORKER,
               STATE_MACHINE_UPDATER,
               FOLLOWER_STATE,

Reply via email to