This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new 7b36d8f6322 Use CountDownLatch to replace Semaphore in IoTConsensus
log dispatcher closing #13517 (#13638)
7b36d8f6322 is described below
commit 7b36d8f63225bf5b62b094205fdfa13d4ebac2e4
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Sep 26 19:12:02 2024 +0800
Use CountDownLatch to replace Semaphore in IoTConsensus log dispatcher
closing #13517 (#13638)
---
.../apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 68b78ab00b6..6b33fcc5dfc 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -46,8 +46,8 @@ import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -229,7 +229,7 @@ public class LogDispatcher {
private final LogDispatcherThreadMetrics logDispatcherThreadMetrics;
- private Semaphore threadSemaphore = new Semaphore(0);
+ private final CountDownLatch runFinished = new CountDownLatch(1);
public LogDispatcherThread(Peer peer, IoTConsensusConfig config, long
initialSyncIndex) {
this.peer = peer;
@@ -311,7 +311,7 @@ public class LogDispatcher {
private void processStopped() {
try {
- if (!threadSemaphore.tryAcquire(30, TimeUnit.SECONDS)) {
+ if (!runFinished.await(30, TimeUnit.SECONDS)) {
logger.info("{}: Dispatcher for {} didn't stop after 30s.",
impl.getThisNode(), peer);
}
} catch (InterruptedException e) {
@@ -386,7 +386,7 @@ public class LogDispatcher {
} catch (Exception e) {
logger.error("Unexpected error in logDispatcher for peer {}", peer, e);
}
- threadSemaphore.release();
+ runFinished.countDown();
logger.info("{}: Dispatcher for {} exits", impl.getThisNode(), peer);
}