This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0729_test by this push:
new 6f9e8b5612 change condition to native wait notify
6f9e8b5612 is described below
commit 6f9e8b56128a329307f9b94c640e25ea20082faf
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Aug 1 14:40:12 2022 +0800
change condition to native wait notify
---
.../statemachine/DataRegionStateMachine.java | 22 +++++-----------------
1 file changed, 5 insertions(+), 17 deletions(-)
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index c9c11bf48c..fbb1020e8f 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -52,9 +52,6 @@ import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -68,8 +65,6 @@ public class DataRegionStateMachine extends BaseStateMachine {
private static final int MAX_REQUEST_CACHE_SIZE = 5;
private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
private final PriorityQueue<InsertNodeWrapper> requestCache;
- private final ReentrantLock lock = new ReentrantLock();
- private final Condition cacheCondition = lock.newCondition();
public DataRegionStateMachine(DataRegion region) {
this.region = region;
@@ -119,29 +114,22 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode
insertNode) {
- lock.lock();
- try {
+
+ synchronized (requestCache) {
requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
- cacheCondition.signalAll();
+ requestCache.notifyAll();
while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
&& requestCache.peek().getSyncIndex() == syncIndex)) {
try {
- boolean timeoutTriggered =
- !cacheCondition.await(CACHE_WINDOW_TIME_IN_MS,
TimeUnit.MILLISECONDS);
- if (timeoutTriggered) {
- logger.warn("wait next write request timeout. {}",
requestCache.peek().syncIndex);
- break;
- }
+ requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
- cacheCondition.signalAll();
+ requestCache.notifyAll();
InsertNodeWrapper wrapper = requestCache.poll();
logger.info("queue size {}, syncIndex = {}", requestCache.size(),
wrapper.getSyncIndex());
return wrapper.getInsertNode();
- } finally {
- lock.unlock();
}
}