This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch jira3554
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit d17fd4f16b41bae46973718aa2efb3f70182908a
Author: LebronAl <[email protected]>
AuthorDate: Mon Jun 20 19:16:57 2022 +0800

    fix
---
 .../iotdb/consensus/config/MultiLeaderConfig.java       |  2 +-
 .../consensus/multileader/MultiLeaderConsensus.java     | 17 ++++++++++++++++-
 .../consensus/multileader/MultiLeaderServerImpl.java    |  6 +++++-
 .../multileader/logdispatcher/LogDispatcher.java        | 12 +++++-------
 4 files changed, 27 insertions(+), 10 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index b98df9f822..7f4c0b4de9 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -207,7 +207,7 @@ public class MultiLeaderConfig {
     public static class Builder {
       private int maxPendingRequestNumPerNode = 1000;
       private int maxRequestPerBatch = 100;
-      private int maxPendingBatch = 50;
+      private int maxPendingBatch = 20;
       private int maxWaitingTimeForAccumulatingBatchInMs = 10;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index fbd9b280f3..1a6480503d 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.consensus.multileader;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.commons.exception.StartupException;
 import org.apache.iotdb.commons.service.RegisterManager;
@@ -38,6 +39,8 @@ import 
org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
 import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
 import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException;
 import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import 
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
+import 
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
 import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService;
 import 
org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor;
 
@@ -67,6 +70,7 @@ public class MultiLeaderConsensus implements IConsensus {
   private final MultiLeaderRPCService service;
   private final RegisterManager registerManager = new RegisterManager();
   private final MultiLeaderConfig config;
+  private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> 
clientManager;
 
   public MultiLeaderConsensus(ConsensusConfig config, Registry registry) {
     this.thisNode = config.getThisNode();
@@ -74,6 +78,10 @@ public class MultiLeaderConsensus implements IConsensus {
     this.config = config.getMultiLeaderConfig();
     this.registry = registry;
     this.service = new MultiLeaderRPCService(thisNode, 
config.getMultiLeaderConfig());
+    this.clientManager =
+        new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>()
+            .createClientManager(
+                new 
AsyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig()));
   }
 
   @Override
@@ -105,6 +113,7 @@ public class MultiLeaderConsensus implements IConsensus {
                   new Peer(consensusGroupId, thisNode),
                   new ArrayList<>(),
                   registry.apply(consensusGroupId),
+                  clientManager,
                   config);
           stateMachineMap.put(consensusGroupId, consensus);
           consensus.start();
@@ -117,6 +126,7 @@ public class MultiLeaderConsensus implements IConsensus {
   public void stop() {
     
stateMachineMap.values().parallelStream().forEach(MultiLeaderServerImpl::stop);
     registerManager.deregisterAll();
+    clientManager.close();
   }
 
   @Override
@@ -166,7 +176,12 @@ public class MultiLeaderConsensus implements IConsensus {
           }
           MultiLeaderServerImpl impl =
               new MultiLeaderServerImpl(
-                  path, new Peer(groupId, thisNode), peers, 
registry.apply(groupId), config);
+                  path,
+                  new Peer(groupId, thisNode),
+                  peers,
+                  registry.apply(groupId),
+                  clientManager,
+                  config);
           impl.start();
           return impl;
         });
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 596e72bd53..272c244a1e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -19,7 +19,9 @@
 
 package org.apache.iotdb.consensus.multileader;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.consensus.IStateMachine;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
@@ -27,6 +29,7 @@ import 
org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import 
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController;
 import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -63,6 +66,7 @@ public class MultiLeaderServerImpl {
       Peer thisNode,
       List<Peer> configuration,
       IStateMachine stateMachine,
+      IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager,
       MultiLeaderConfig config) {
     this.storageDir = storageDir;
     this.thisNode = thisNode;
@@ -76,7 +80,7 @@ public class MultiLeaderServerImpl {
       persistConfiguration();
     }
     this.config = config;
-    this.logDispatcher = new LogDispatcher(this);
+    this.logDispatcher = new LogDispatcher(this, clientManager);
   }
 
   public IStateMachine getStateMachine() {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 8a6a94969e..a2add314ae 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.config.MultiLeaderConfig;
 import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import 
org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient;
 import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler;
-import 
org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory;
 import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
@@ -60,11 +59,14 @@ public class LogDispatcher {
 
   private final MultiLeaderServerImpl impl;
   private final List<LogDispatcherThread> threads;
+  private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> 
clientManager;
   private ExecutorService executorService;
-  private IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> 
clientManager;
 
-  public LogDispatcher(MultiLeaderServerImpl impl) {
+  public LogDispatcher(
+      MultiLeaderServerImpl impl,
+      IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) {
     this.impl = impl;
+    this.clientManager = clientManager;
     this.threads =
         impl.getConfiguration().stream()
             .filter(x -> !Objects.equals(x, impl.getThisNode()))
@@ -73,9 +75,6 @@ public class LogDispatcher {
     if (!threads.isEmpty()) {
       this.executorService =
           IoTDBThreadPoolFactory.newFixedThreadPool(threads.size(), 
"LogDispatcher");
-      this.clientManager =
-          new IClientManager.Factory<TEndPoint, 
AsyncMultiLeaderServiceClient>()
-              .createClientManager(new 
AsyncMultiLeaderServiceClientPoolFactory(impl.getConfig()));
     }
   }
 
@@ -89,7 +88,6 @@ public class LogDispatcher {
     if (!threads.isEmpty()) {
       threads.forEach(LogDispatcherThread::stop);
       executorService.shutdownNow();
-      clientManager.close();
       int timeout = 10;
       try {
         if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {

Reply via email to