This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0729_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0729_test by this push:
     new 6f9e8b5612 change condition to native wait notify
6f9e8b5612 is described below

commit 6f9e8b56128a329307f9b94c640e25ea20082faf
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Aug 1 14:40:12 2022 +0800

    change condition to native wait notify
---
 .../statemachine/DataRegionStateMachine.java       | 22 +++++-----------------
 1 file changed, 5 insertions(+), 17 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index c9c11bf48c..fbb1020e8f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -52,9 +52,6 @@ import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.PriorityQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -68,8 +65,6 @@ public class DataRegionStateMachine extends BaseStateMachine {
   private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
   private final PriorityQueue<InsertNodeWrapper> requestCache;
-  private final ReentrantLock lock = new ReentrantLock();
-  private final Condition cacheCondition = lock.newCondition();
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
@@ -119,29 +114,22 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
   }
 
   private InsertNode cacheAndGetLatestInsertNode(long syncIndex, InsertNode 
insertNode) {
-    lock.lock();
-    try {
+
+    synchronized (requestCache) {
       requestCache.add(new InsertNodeWrapper(syncIndex, insertNode));
-      cacheCondition.signalAll();
+      requestCache.notifyAll();
       while (!(requestCache.size() >= MAX_REQUEST_CACHE_SIZE
           && requestCache.peek().getSyncIndex() == syncIndex)) {
         try {
-          boolean timeoutTriggered =
-              !cacheCondition.await(CACHE_WINDOW_TIME_IN_MS, 
TimeUnit.MILLISECONDS);
-          if (timeoutTriggered) {
-            logger.warn("wait next write request timeout. {}", 
requestCache.peek().syncIndex);
-            break;
-          }
+          requestCache.wait(CACHE_WINDOW_TIME_IN_MS);
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
         }
       }
-      cacheCondition.signalAll();
+      requestCache.notifyAll();
       InsertNodeWrapper wrapper = requestCache.poll();
       logger.info("queue size {}, syncIndex = {}", requestCache.size(), 
wrapper.getSyncIndex());
       return wrapper.getInsertNode();
-    } finally {
-      lock.unlock();
     }
   }
 

Reply via email to