This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch fix-wal-roll-file-test-wait
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0f58e8cf6a66c3a6c71401b9536b4ba605bfddb9
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jun 10 19:26:44 2026 +0800

    Fix WAL roll file test flush wait
---
 .../wal/node/WALNodeWaitForRollFileTest.java       | 99 ++++++++++++----------
 1 file changed, 52 insertions(+), 47 deletions(-)

diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
index b24a8cd29cf..e50d8cdeb25 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/wal/node/WALNodeWaitForRollFileTest.java
@@ -31,6 +31,8 @@ import org.apache.iotdb.db.storageengine.StorageEngine;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.AbstractResultListener.Status;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
 
@@ -40,7 +42,6 @@ import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.write.schema.MeasurementSchema;
-import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -55,6 +56,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -124,12 +126,11 @@ public class WALNodeWaitForRollFileTest {
     // write a small amount of data (not enough to trigger roll)
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
-    walNode.log(
-        memTable.getMemTableId(),
-        insertTabletNode,
-        Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+    waitForFlush(
+        walNode.log(
+            memTable.getMemTableId(),
+            insertTabletNode,
+            Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()})));
 
     // data is flushed to buffer but no WAL file roll happened yet, iterator 
at search index 1
     // should not find data (because the current-writing WAL file is not 
readable by the iterator)
@@ -160,17 +161,15 @@ public class WALNodeWaitForRollFileTest {
     for (int i = 1; i <= 5; i++) {
       InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {i});
       insertTabletNode.setSearchIndex(i);
-      walNode.log(
-          memTable.getMemTableId(),
-          insertTabletNode,
-          Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
+      waitForFlush(
+          walNode.log(
+              memTable.getMemTableId(),
+              insertTabletNode,
+              Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()})));
     }
 
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
-
     // roll the WAL file so the data is in a closed file readable by the 
iterator
     walNode.rollWALFile();
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
 
     // iterator at search index 1 should find the data after roll
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
@@ -185,16 +184,15 @@ public class WALNodeWaitForRollFileTest {
 
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
-    walNode.log(
-        memTable.getMemTableId(),
-        insertTabletNode,
-        Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-    walNode.log(memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode());
-
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+    waitForFlush(
+        walNode.log(
+            memTable.getMemTableId(),
+            insertTabletNode,
+            Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()})));
+    waitForFlush(
+        walNode.log(memTable.getMemTableId(), new 
ContinuousSameSearchIndexSeparatorNode()));
 
     walNode.rollWALFile();
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
 
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
     assertTrue(iterator.hasNext());
@@ -214,12 +212,11 @@ public class WALNodeWaitForRollFileTest {
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
     insertTabletNode.setLastFragment(true);
-    walNode.log(
-        memTable.getMemTableId(),
-        insertTabletNode,
-        Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+    waitForFlush(
+        walNode.log(
+            memTable.getMemTableId(),
+            insertTabletNode,
+            Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()})));
 
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
 
@@ -246,7 +243,6 @@ public class WALNodeWaitForRollFileTest {
 
     // trigger WAL file roll — this should signal rollLogWriterCondition and 
wake up the iterator
     walNode.rollWALFile();
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
 
     waitFuture.get(20, TimeUnit.SECONDS);
     executor.shutdown();
@@ -274,14 +270,11 @@ public class WALNodeWaitForRollFileTest {
       // write initial data with search index
       InsertTabletNode first = getInsertTabletNode(devicePath, new long[] {1});
       first.setSearchIndex(1);
-      walNode.log(
-          memTable.getMemTableId(),
-          first,
-          Collections.singletonList(new int[] {0, first.getRowCount()}));
-
-      Awaitility.await()
-          .atMost(10, TimeUnit.SECONDS)
-          .until(() -> walNode.isAllWALEntriesConsumed());
+      waitForFlush(
+          walNode.log(
+              memTable.getMemTableId(),
+              first,
+              Collections.singletonList(new int[] {0, first.getRowCount()})));
 
       ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
 
@@ -305,14 +298,17 @@ public class WALNodeWaitForRollFileTest {
       Thread.sleep(500);
 
       // write more data to exceed the small threshold and trigger auto-roll
+      WALFlushListener lastFlushListener = null;
       for (int i = 2; i <= 50; i++) {
         InsertTabletNode node = getInsertTabletNode(devicePath, new long[] 
{i});
         node.setSearchIndex(i);
-        walNode.log(
-            memTable.getMemTableId(),
-            node,
-            Collections.singletonList(new int[] {0, node.getRowCount()}));
+        lastFlushListener =
+            walNode.log(
+                memTable.getMemTableId(),
+                node,
+                Collections.singletonList(new int[] {0, node.getRowCount()}));
       }
+      waitForFlush(lastFlushListener);
 
       waitFuture.get(40, TimeUnit.SECONDS);
       executor.shutdown();
@@ -341,12 +337,11 @@ public class WALNodeWaitForRollFileTest {
     InsertTabletNode insertTabletNode = getInsertTabletNode(devicePath, new 
long[] {1});
     insertTabletNode.setSearchIndex(1);
     insertTabletNode.setLastFragment(true);
-    walNode.log(
-        memTable.getMemTableId(),
-        insertTabletNode,
-        Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()}));
-
-    Awaitility.await().atMost(10, TimeUnit.SECONDS).until(() -> 
walNode.isAllWALEntriesConsumed());
+    waitForFlush(
+        walNode.log(
+            memTable.getMemTableId(),
+            insertTabletNode,
+            Collections.singletonList(new int[] {0, 
insertTabletNode.getRowCount()})));
 
     // iterator cannot read the active WAL file, so hasNext() should be false
     ConsensusReqReader.ReqIterator iterator = walNode.getReqIterator(1);
@@ -443,4 +438,14 @@ public class WALNodeWaitForRollFileTest {
         columns,
         times.length);
   }
+
+  private void waitForFlush(WALFlushListener flushListener) throws Exception {
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    try {
+      Future<Status> waitFuture = 
executor.submit(flushListener::waitForResult);
+      assertEquals(Status.SUCCESS, waitFuture.get(10, TimeUnit.SECONDS));
+    } finally {
+      executor.shutdownNow();
+    }
+  }
 }

Reply via email to