This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cf0d3dd93e3 Fix Concurrency Instability When IoTConsensus 
LogDispatcher Exits #12655
cf0d3dd93e3 is described below

commit cf0d3dd93e376476515745be68e90c1c722ed126
Author: Xiangpeng Hu <[email protected]>
AuthorDate: Tue Jun 4 21:00:14 2024 +0800

    Fix Concurrency Instability When IoTConsensus LogDispatcher Exits #12655
---
 .../consensus/iot/logdispatcher/LogDispatcher.java | 34 +++++-------
 .../apache/iotdb/consensus/iot/ReplicateTest.java  | 62 ++++------------------
 2 files changed, 21 insertions(+), 75 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index d7424f136d7..f5de226cc41 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,12 +46,9 @@ import java.util.Objects;
 import java.util.OptionalLong;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
@@ -101,7 +98,7 @@ public class LogDispatcher {
 
   public synchronized void start() {
     if (!threads.isEmpty()) {
-      threads.forEach(thread -> 
thread.setFuture(executorService.submit(thread)));
+      threads.forEach(executorService::submit);
     }
   }
 
@@ -133,7 +130,7 @@ public class LogDispatcher {
     if (this.executorService == null) {
       initLogSyncThreadPool();
     }
-    thread.setFuture(executorService.submit(thread));
+    executorService.submit(thread);
   }
 
   public synchronized void removeLogDispatcherThread(Peer peer) throws 
IOException {
@@ -231,7 +228,7 @@ public class LogDispatcher {
 
     private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
 
-    private Future<?> future;
+    private Semaphore threadSemaphore = new Semaphore(0);
 
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long 
initialSyncIndex) {
       this.peer = peer;
@@ -257,10 +254,6 @@ public class LogDispatcher {
       return controller.getCurrentIndex();
     }
 
-    public void setFuture(Future<?> future) {
-      this.future = future;
-    }
-
     public long getLastFlushedSyncIndex() {
       return controller.getLastFlushedIndex();
     }
@@ -308,16 +301,12 @@ public class LogDispatcher {
 
     public void stop() {
       stopped = true;
-      if (!future.cancel(true)) {
-        logger.warn("LogDispatcherThread Future for {} is not stopped", peer);
-      }
       try {
-        future.get(30, TimeUnit.SECONDS);
-      } catch (InterruptedException | ExecutionException | TimeoutException e) 
{
+        if (!threadSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
+          logger.error("{}: Dispatcher for {} didn't stop after 30s.", 
impl.getThisNode(), peer);
+        }
+      } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.warn("LogDispatcherThread Future for {} is not stopped", peer, 
e);
-      } catch (CancellationException ignored) {
-        // ignore because it is expected
       }
       long requestSize = 0;
       for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
@@ -351,7 +340,7 @@ public class LogDispatcher {
       logger.info("{}: Dispatcher for {} starts", impl.getThisNode(), peer);
       try {
         Batch batch;
-        while (!Thread.interrupted()) {
+        while (!Thread.interrupted() && !stopped) {
           long startTime = System.nanoTime();
           while ((batch = getBatch()).isEmpty()) {
             // we may block here if there is no requests in the queue
@@ -366,12 +355,12 @@ public class LogDispatcher {
               }
             }
             // Immediately check for interrupts after poll and sleep
-            if (Thread.interrupted()) {
+            if (Thread.interrupted() || stopped) {
               throw new InterruptedException("Interrupted after polling and 
sleeping");
             }
           }
           // Immediately check for interrupts after a time-consuming 
getBatch() operation
-          if (Thread.interrupted()) {
+          if (Thread.interrupted() || stopped) {
             throw new InterruptedException("Interrupted after getting a 
batch");
           }
           
logDispatcherThreadMetrics.recordConstructBatchTime(System.nanoTime() - 
startTime);
@@ -388,6 +377,7 @@ public class LogDispatcher {
       } catch (Exception e) {
         logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
       }
+      threadSemaphore.release();
       logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
     }
 
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
index 610692a73e1..6e2326dd546 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/ReplicateTest.java
@@ -33,21 +33,17 @@ import org.apache.iotdb.consensus.iot.util.TestEntry;
 import org.apache.iotdb.consensus.iot.util.TestStateMachine;
 
 import org.apache.ratis.util.FileUtils;
-import org.apache.tsfile.utils.PublicBAOS;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
 import java.net.ServerSocket;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -102,42 +98,10 @@ public class ReplicateTest {
     }
   }
 
-  public void changeConfiguration(int i) {
-    try (PublicBAOS publicBAOS = new PublicBAOS();
-        DataOutputStream outputStream = new DataOutputStream(publicBAOS)) {
-      outputStream.writeInt(this.peers.size());
-      for (Peer peer : this.peers) {
-        peer.serialize(outputStream);
-      }
-      File storageDir = new 
File(IoTConsensus.buildPeerDir(peersStorage.get(i), gid));
-      Path tmpConfigurationPath =
-          Paths.get(new File(storageDir, 
CONFIGURATION_TMP_FILE_NAME).getAbsolutePath());
-      Path configurationPath =
-          Paths.get(new File(storageDir, 
CONFIGURATION_FILE_NAME).getAbsolutePath());
-      if (!Files.exists(configurationPath) && 
!Files.exists(tmpConfigurationPath)) {
-        return;
-      }
-      if (!Files.exists(tmpConfigurationPath)) {
-        Files.createDirectories(tmpConfigurationPath.getParent());
-        Files.createFile(tmpConfigurationPath);
-      }
-      Files.write(tmpConfigurationPath, publicBAOS.getBuf());
-      if (Files.exists(configurationPath)) {
-        Files.delete(configurationPath);
-      }
-      Files.move(tmpConfigurationPath, configurationPath);
-    } catch (IOException e) {
-      logger.error("Unexpected error occurs when persisting configuration", e);
-    }
-  }
-
   private void initServer() throws IOException {
-    for (int i = 0; i < peers.size(); i++) {
-      findPortAvailable(i);
-    }
+    Assume.assumeTrue(checkPortAvailable());
     for (int i = 0; i < peers.size(); i++) {
       int finalI = i;
-      changeConfiguration(i);
       servers.add(
           (IoTConsensus)
               ConsensusFactory.getConsensusImpl(
@@ -363,23 +327,15 @@ public class ReplicateTest {
     }
   }
 
-  private void findPortAvailable(int i) {
-    long start = System.currentTimeMillis();
-    while (System.currentTimeMillis() - start < timeout) {
-      try (ServerSocket ignored = new 
ServerSocket(this.peers.get(i).getEndpoint().port)) {
-        // success
-        return;
+  private boolean checkPortAvailable() {
+    for (Peer peer : this.peers) {
+      try (ServerSocket ignored = new ServerSocket(peer.getEndpoint().port)) {
+        logger.info("check port {} success for node {}", 
peer.getEndpoint().port, peer.getNodeId());
       } catch (IOException e) {
-        // Port is already in use, wait and retry
-        this.peers.set(i, new Peer(gid, i + 1, new TEndPoint("127.0.0.1", 
this.basePort)));
-        logger.info("try port {} for node {}.", this.basePort++, i + 1);
-        try {
-          Thread.sleep(50); // Wait for 1 second before retrying
-        } catch (InterruptedException ex) {
-          // Handle the interruption if needed
-        }
+        logger.error("check port {} failed for node {}", 
peer.getEndpoint().port, peer.getNodeId());
+        return false;
       }
     }
-    Assert.fail(String.format("can not find port for node %d after 300s", i + 
1));
+    return true;
   }
 }

Reply via email to