This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a8411f66837 Fix Concurrency Instability When IoTConsensus
LogDispatcher Exits (#12411)
a8411f66837 is described below
commit a8411f66837d0d6096aed4cfd30d4316ee211527
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Thu Apr 25 16:12:06 2024 +0800
Fix Concurrency Instability When IoTConsensus LogDispatcher Exits (#12411)
* add interrupts check
* remove duplicate configuration
* remove unuseless delete
* change removeif
* fix ut
* add configuration sort
---
.../consensus/iot/logdispatcher/LogDispatcher.java | 8 +++++
.../apache/iotdb/consensus/iot/ReplicateTest.java | 36 ++++++++++++++++++----
2 files changed, 38 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 8f08fccaa18..adf8218b46d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -365,6 +365,14 @@ public class LogDispatcher {
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
}
}
+ // Immediately check for interrupts after poll and sleep
+ if (Thread.interrupted()) {
+ throw new InterruptedException("Interrupted after polling and
sleeping");
+ }
+ }
+ // Immediately check for interrupts after a time-consuming
getBatch() operation
+ if (Thread.interrupted()) {
+ throw new InterruptedException("Interrupted after getting a
batch");
}
logDispatcherThreadMetrics.recordConstructBatchTime(System.nanoTime() -
startTime);
// we may block here if the synchronization pipeline is full
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index d758af63dc6..5ea55283a6f 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -50,6 +50,7 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -113,6 +114,13 @@ public class ReplicateTest {
Paths.get(new File(storageDir,
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
Path configurationPath =
Paths.get(new File(storageDir,
CONFIGURATION_FILE_NAME).getAbsolutePath());
+ if (!Files.exists(configurationPath) &&
!Files.exists(tmpConfigurationPath)) {
+ return;
+ }
+ if (!Files.exists(tmpConfigurationPath)) {
+ Files.createDirectories(tmpConfigurationPath.getParent());
+ Files.createFile(tmpConfigurationPath);
+ }
Files.write(tmpConfigurationPath, publicBAOS.getBuf());
if (Files.exists(configurationPath)) {
Files.delete(configurationPath);
@@ -204,9 +212,17 @@ public class ReplicateTest {
stopServer();
initServer();
- Assert.assertEquals(peers,
servers.get(0).getImpl(gid).getConfiguration());
- Assert.assertEquals(peers,
servers.get(1).getImpl(gid).getConfiguration());
- Assert.assertEquals(peers,
servers.get(2).getImpl(gid).getConfiguration());
+ List<Peer> configuration =
servers.get(0).getImpl(gid).getConfiguration();
+ configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+ Assert.assertEquals(peers, configuration);
+
+ configuration = servers.get(1).getImpl(gid).getConfiguration();
+ configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+ Assert.assertEquals(peers, configuration);
+
+ configuration = servers.get(2).getImpl(gid).getConfiguration();
+ configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+ Assert.assertEquals(peers, configuration);
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getSearchIndex());
@@ -268,9 +284,17 @@ public class ReplicateTest {
servers.get(2).createLocalPeer(group.getGroupId(), group.getPeers());
- Assert.assertEquals(peers,
servers.get(0).getImpl(gid).getConfiguration());
- Assert.assertEquals(peers,
servers.get(1).getImpl(gid).getConfiguration());
- Assert.assertEquals(peers,
servers.get(2).getImpl(gid).getConfiguration());
+ List<Peer> configuration =
servers.get(0).getImpl(gid).getConfiguration();
+ configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+ Assert.assertEquals(peers, configuration);
+
+ configuration = servers.get(1).getImpl(gid).getConfiguration();
+ configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+ Assert.assertEquals(peers, configuration);
+
+ configuration = servers.get(2).getImpl(gid).getConfiguration();
+ configuration.sort(Comparator.comparingInt(Peer::getNodeId));
+ Assert.assertEquals(peers, configuration);
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(0).getImpl(gid).getSearchIndex());
Assert.assertEquals(CHECK_POINT_GAP,
servers.get(1).getImpl(gid).getSearchIndex());