This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 55998590366 Use CountDownLatch to replace Semaphore in IoTConsensus 
log dispatcher closing #13517
55998590366 is described below

commit 55998590366c86c9080437ba87368ff582684529
Author: Li Yu Heng <[email protected]>
AuthorDate: Sat Sep 14 19:06:32 2024 +0800

    Use CountDownLatch to replace Semaphore in IoTConsensus log dispatcher 
closing #13517
---
 .../apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 9ffb29e77f3..a62bdd4e132 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,8 +46,8 @@ import java.util.Objects;
 import java.util.OptionalLong;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -228,7 +228,7 @@ public class LogDispatcher {
 
     private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
 
-    private Semaphore threadSemaphore = new Semaphore(0);
+    private final CountDownLatch runFinished = new CountDownLatch(1);
 
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long 
initialSyncIndex) {
       this.peer = peer;
@@ -302,7 +302,7 @@ public class LogDispatcher {
     public void stop() {
       stopped = true;
       try {
-        if (!threadSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
+        if (!runFinished.await(30, TimeUnit.SECONDS)) {
           logger.info("{}: Dispatcher for {} didn't stop after 30s.", 
impl.getThisNode(), peer);
         }
       } catch (InterruptedException e) {
@@ -377,7 +377,7 @@ public class LogDispatcher {
       } catch (Exception e) {
         logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
       }
-      threadSemaphore.release();
+      runFinished.countDown();
       logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
     }
 

Reply via email to