This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c0c4e75cbe [IOTDB-6051] Fixed concurrency error in IoTConsensus UT 
when stopping cluster (#10457)
1c0c4e75cbe is described below

commit 1c0c4e75cbe3cde1f4b467e28a22a23e2642d1ed
Author: Potato <[email protected]>
AuthorDate: Mon Jul 10 11:28:10 2023 +0800

    [IOTDB-6051] Fixed concurrency error in IoTConsensus UT when stopping 
cluster (#10457)
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java |  3 ++-
 .../org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java |  2 +-
 .../iotdb/consensus/iot/logdispatcher/LogDispatcher.java       | 10 ++++------
 3 files changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 5cfe53459de..047d557ad55 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -145,8 +145,9 @@ public class IoTConsensus implements IConsensus {
 
   @Override
   public void stop() {
-    clientManager.close();
     
stateMachineMap.values().parallelStream().forEach(IoTConsensusServerImpl::stop);
+    clientManager.close();
+    syncClientManager.close();
     registerManager.deregisterAll();
   }
 
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
index c4d4d463ee3..0b76a690dba 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java
@@ -138,7 +138,7 @@ public class IoTConsensusServerImpl {
     consensusReqReader = (ConsensusReqReader) stateMachine.read(new 
GetConsensusReqReaderPlan());
     this.searchIndex = new 
AtomicLong(consensusReqReader.getCurrentSearchIndex());
     this.ioTConsensusServerMetrics = new IoTConsensusServerMetrics(this);
-    this.logDispatcher = new LogDispatcher(this, clientManager, 
ioTConsensusServerMetrics);
+    this.logDispatcher = new LogDispatcher(this, clientManager);
     // Since the underlying wal does not persist safelyDeletedSearchIndex, 
IoTConsensus needs to
     // update wal with its syncIndex recovered from the consensus layer when 
initializing.
     // This prevents wal from being piled up if the safelyDeletedSearchIndex 
is not updated after
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 458faea805b..ef88d5a8163 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
 import org.apache.iotdb.consensus.config.IoTConsensusConfig;
 import org.apache.iotdb.consensus.iot.IoTConsensusServerImpl;
-import org.apache.iotdb.consensus.iot.IoTConsensusServerMetrics;
 import org.apache.iotdb.consensus.iot.client.AsyncIoTConsensusServiceClient;
 import org.apache.iotdb.consensus.iot.client.DispatchLogHandler;
 import org.apache.iotdb.consensus.iot.log.ConsensusReqReader;
@@ -70,8 +69,7 @@ public class LogDispatcher {
 
   public LogDispatcher(
       IoTConsensusServerImpl impl,
-      IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager,
-      IoTConsensusServerMetrics ioTConsensusServerMetrics) {
+      IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager) 
{
     this.impl = impl;
     this.selfPeerId = impl.getThisNode().getNodeId();
     this.clientManager = clientManager;
@@ -103,7 +101,6 @@ public class LogDispatcher {
 
   public synchronized void stop() {
     if (!threads.isEmpty()) {
-      threads.forEach(LogDispatcherThread::stop);
       executorService.shutdownNow();
       int timeout = 10;
       try {
@@ -114,6 +111,7 @@ public class LogDispatcher {
         Thread.currentThread().interrupt();
         logger.error("Unexpected Interruption when closing LogDispatcher 
service ");
       }
+      threads.forEach(LogDispatcherThread::stop);
     }
     stopped = true;
   }
@@ -224,6 +222,7 @@ public class LogDispatcher {
       this.syncStatus = new SyncStatus(controller, config);
       this.walEntryIterator = reader.getReqIterator(START_INDEX);
       this.logDispatcherThreadMetrics = new LogDispatcherThreadMetrics(this);
+      MetricService.getInstance().addMetricSet(logDispatcherThreadMetrics);
     }
 
     public IndexController getController() {
@@ -303,10 +302,9 @@ public class LogDispatcher {
     @Override
     public void run() {
       logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
-      MetricService.getInstance().addMetricSet(logDispatcherThreadMetrics);
       try {
         Batch batch;
-        while (!Thread.interrupted() && !stopped) {
+        while (!Thread.interrupted()) {
           long startTime = System.nanoTime();
           while ((batch = getBatch()).isEmpty()) {
             // we may block here if there is no requests in the queue

Reply via email to