This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new 7b36d8f6322 Use CountDownLatch to replace Semaphore in IoTConsensus 
log dispatcher closing #13517 (#13638)
7b36d8f6322 is described below

commit 7b36d8f63225bf5b62b094205fdfa13d4ebac2e4
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Sep 26 19:12:02 2024 +0800

    Use CountDownLatch to replace Semaphore in IoTConsensus log dispatcher 
closing #13517 (#13638)
---
 .../apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 68b78ab00b6..6b33fcc5dfc 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,8 +46,8 @@ import java.util.Objects;
 import java.util.OptionalLong;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
@@ -229,7 +229,7 @@ public class LogDispatcher {
 
     private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
 
-    private Semaphore threadSemaphore = new Semaphore(0);
+    private final CountDownLatch runFinished = new CountDownLatch(1);
 
     public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long 
initialSyncIndex) {
       this.peer = peer;
@@ -311,7 +311,7 @@ public class LogDispatcher {
 
     private void processStopped() {
       try {
-        if (!threadSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
+        if (!runFinished.await(30, TimeUnit.SECONDS)) {
           logger.info("{}: Dispatcher for {} didn't stop after 30s.", 
impl.getThisNode(), peer);
         }
       } catch (InterruptedException e) {
@@ -386,7 +386,7 @@ public class LogDispatcher {
       } catch (Exception e) {
         logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
       }
-      threadSemaphore.release();
+      runFinished.countDown();
       logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
     }
 

Reply via email to