This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch jira3554 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d17fd4f16b41bae46973718aa2efb3f70182908a Author: LebronAl <[email protected]> AuthorDate: Mon Jun 20 19:16:57 2022 +0800 fix --- .../iotdb/consensus/config/MultiLeaderConfig.java | 2 +- .../consensus/multileader/MultiLeaderConsensus.java | 17 ++++++++++++++++- .../consensus/multileader/MultiLeaderServerImpl.java | 6 +++++- .../multileader/logdispatcher/LogDispatcher.java | 12 +++++------- 4 files changed, 27 insertions(+), 10 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java index b98df9f822..7f4c0b4de9 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java @@ -207,7 +207,7 @@ public class MultiLeaderConfig { public static class Builder { private int maxPendingRequestNumPerNode = 1000; private int maxRequestPerBatch = 100; - private int maxPendingBatch = 50; + private int maxPendingBatch = 20; private int maxWaitingTimeForAccumulatingBatchInMs = 10; private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100); private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java index fbd9b280f3..1a6480503d 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java @@ -20,6 +20,7 @@ package org.apache.iotdb.consensus.multileader; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.consensus.ConsensusGroupId; import org.apache.iotdb.commons.exception.StartupException; import org.apache.iotdb.commons.service.RegisterManager; @@ -38,6 +39,8 @@ import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException; import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException; import org.apache.iotdb.consensus.exception.IllegalPeerEndpointException; import org.apache.iotdb.consensus.exception.IllegalPeerNumException; +import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient; +import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory; import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCService; import org.apache.iotdb.consensus.multileader.service.MultiLeaderRPCServiceProcessor; @@ -67,6 +70,7 @@ public class MultiLeaderConsensus implements IConsensus { private final MultiLeaderRPCService service; private final RegisterManager registerManager = new RegisterManager(); private final MultiLeaderConfig config; + private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager; public MultiLeaderConsensus(ConsensusConfig config, Registry registry) { this.thisNode = config.getThisNode(); @@ -74,6 +78,10 @@ public class MultiLeaderConsensus implements IConsensus { this.config = config.getMultiLeaderConfig(); this.registry = registry; this.service = new MultiLeaderRPCService(thisNode, config.getMultiLeaderConfig()); + this.clientManager = + new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>() + .createClientManager( + new AsyncMultiLeaderServiceClientPoolFactory(config.getMultiLeaderConfig())); } @Override @@ -105,6 +113,7 @@ public class MultiLeaderConsensus implements IConsensus { new Peer(consensusGroupId, thisNode), new ArrayList<>(), registry.apply(consensusGroupId), + clientManager, config); stateMachineMap.put(consensusGroupId, consensus); consensus.start(); @@ -117,6 +126,7 @@ public class MultiLeaderConsensus implements IConsensus { public void stop() { stateMachineMap.values().parallelStream().forEach(MultiLeaderServerImpl::stop); registerManager.deregisterAll(); + clientManager.close(); } @Override @@ -166,7 +176,12 @@ public class MultiLeaderConsensus implements IConsensus { } MultiLeaderServerImpl impl = new MultiLeaderServerImpl( - path, new Peer(groupId, thisNode), peers, registry.apply(groupId), config); + path, + new Peer(groupId, thisNode), + peers, + registry.apply(groupId), + clientManager, + config); impl.start(); return impl; }); diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index 596e72bd53..272c244a1e 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -19,7 +19,9 @@ package org.apache.iotdb.consensus.multileader; +import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; +import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.consensus.IStateMachine; import org.apache.iotdb.consensus.common.DataSet; import org.apache.iotdb.consensus.common.Peer; @@ -27,6 +29,7 @@ import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IConsensusRequest; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.consensus.config.MultiLeaderConfig; +import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient; import org.apache.iotdb.consensus.multileader.logdispatcher.IndexController; import org.apache.iotdb.consensus.multileader.logdispatcher.LogDispatcher; import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader; @@ -63,6 +66,7 @@ public class MultiLeaderServerImpl { Peer thisNode, List<Peer> configuration, IStateMachine stateMachine, + IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager, MultiLeaderConfig config) { this.storageDir = storageDir; this.thisNode = thisNode; @@ -76,7 +80,7 @@ public class MultiLeaderServerImpl { persistConfiguration(); } this.config = config; - this.logDispatcher = new LogDispatcher(this); + this.logDispatcher = new LogDispatcher(this, clientManager); } public IStateMachine getStateMachine() { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 8a6a94969e..a2add314ae 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -29,7 +29,6 @@ import org.apache.iotdb.consensus.config.MultiLeaderConfig; import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl; import org.apache.iotdb.consensus.multileader.client.AsyncMultiLeaderServiceClient; import org.apache.iotdb.consensus.multileader.client.DispatchLogHandler; -import org.apache.iotdb.consensus.multileader.client.MultiLeaderConsensusClientPool.AsyncMultiLeaderServiceClientPoolFactory; import org.apache.iotdb.consensus.multileader.thrift.TLogBatch; import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq; import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader; @@ -60,11 +59,14 @@ public class LogDispatcher { private final MultiLeaderServerImpl impl; private final List<LogDispatcherThread> threads; + private final IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager; private ExecutorService executorService; - private IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager; - public LogDispatcher(MultiLeaderServerImpl impl) { + public LogDispatcher( + MultiLeaderServerImpl impl, + IClientManager<TEndPoint, AsyncMultiLeaderServiceClient> clientManager) { this.impl = impl; + this.clientManager = clientManager; this.threads = impl.getConfiguration().stream() .filter(x -> !Objects.equals(x, impl.getThisNode())) @@ -73,9 +75,6 @@ public class LogDispatcher { if (!threads.isEmpty()) { this.executorService = IoTDBThreadPoolFactory.newFixedThreadPool(threads.size(), "LogDispatcher"); - this.clientManager = - new IClientManager.Factory<TEndPoint, AsyncMultiLeaderServiceClient>() - .createClientManager(new AsyncMultiLeaderServiceClientPoolFactory(impl.getConfig())); } } @@ -89,7 +88,6 @@ public class LogDispatcher { if (!threads.isEmpty()) { threads.forEach(LogDispatcherThread::stop); executorService.shutdownNow(); - clientManager.close(); int timeout = 10; try { if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
