This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a8411f66837 Fix Concurrency Instability When IoTConsensus 
LogDispatcher Exits (#12411)
a8411f66837 is described below

commit a8411f66837d0d6096aed4cfd30d4316ee211527
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Thu Apr 25 16:12:06 2024 +0800

    Fix Concurrency Instability When IoTConsensus LogDispatcher Exits (#12411)
    
    * add interrupts check
    
    * remove duplicate configuration
    
    * remove unuseless delete
    
    * change removeif
    
    * fix ut
    
    * add configuration sort
---
 .../consensus/iot/logdispatcher/LogDispatcher.java |  8 +++++
 .../apache/iotdb/consensus/iot/ReplicateTest.java  | 36 ++++++++++++++++++----
 2 files changed, 38 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 8f08fccaa18..adf8218b46d 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -365,6 +365,14 @@ public class LogDispatcher {
                 
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
               }
             }
+            // Immediately check for interrupts after poll and sleep
+            if (Thread.interrupted()) {
+              throw new InterruptedException("Interrupted after polling and 
sleeping");
+            }
+          }
+          // Immediately check for interrupts after a time-consuming 
getBatch() operation
+          if (Thread.interrupted()) {
+            throw new InterruptedException("Interrupted after getting a 
batch");
           }
           
logDispatcherThreadMetrics.recordConstructBatchTime(System.nanoTime() - 
startTime);
           // we may block here if the synchronization pipeline is full
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index d758af63dc6..5ea55283a6f 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -50,6 +50,7 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -113,6 +114,13 @@ public class ReplicateTest {
           Paths.get(new File(storageDir, 
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
       Path configurationPath =
           Paths.get(new File(storageDir, 
CONFIGURATION_FILE_NAME).getAbsolutePath());
+      if (!Files.exists(configurationPath) && 
!Files.exists(tmpConfigurationPath)) {
+        return;
+      }
+      if (!Files.exists(tmpConfigurationPath)) {
+        Files.createDirectories(tmpConfigurationPath.getParent());
+        Files.createFile(tmpConfigurationPath);
+      }
       Files.write(tmpConfigurationPath, publicBAOS.getBuf());
       if (Files.exists(configurationPath)) {
         Files.delete(configurationPath);
@@ -204,9 +212,17 @@ public class ReplicateTest {
       stopServer();
       initServer();
 
-      Assert.assertEquals(peers, 
servers.get(0).getImpl(gid).getConfiguration());
-      Assert.assertEquals(peers, 
servers.get(1).getImpl(gid).getConfiguration());
-      Assert.assertEquals(peers, 
servers.get(2).getImpl(gid).getConfiguration());
+      List<Peer> configuration = 
servers.get(0).getImpl(gid).getConfiguration();
+      configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+      Assert.assertEquals(peers, configuration);
+
+      configuration = servers.get(1).getImpl(gid).getConfiguration();
+      configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+      Assert.assertEquals(peers, configuration);
+
+      configuration = servers.get(2).getImpl(gid).getConfiguration();
+      configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+      Assert.assertEquals(peers, configuration);
 
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getSearchIndex());
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getSearchIndex());
@@ -268,9 +284,17 @@ public class ReplicateTest {
 
       servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
 
-      Assert.assertEquals(peers, 
servers.get(0).getImpl(gid).getConfiguration());
-      Assert.assertEquals(peers, 
servers.get(1).getImpl(gid).getConfiguration());
-      Assert.assertEquals(peers, 
servers.get(2).getImpl(gid).getConfiguration());
+      List<Peer> configuration = 
servers.get(0).getImpl(gid).getConfiguration();
+      configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+      Assert.assertEquals(peers, configuration);
+
+      configuration = servers.get(1).getImpl(gid).getConfiguration();
+      configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+      Assert.assertEquals(peers, configuration);
+
+      configuration = servers.get(2).getImpl(gid).getConfiguration();
+      configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+      Assert.assertEquals(peers, configuration);
 
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(0).getImpl(gid).getSearchIndex());
       Assert.assertEquals(CHECK_POINT_GAP, 
servers.get(1).getImpl(gid).getSearchIndex());

Reply via email to