This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 1ac6984eb6b [To dev/1.3] Pipe CI: always flush for 
IoTDBPipeDataSinkIT.testReceiverAutoCreate with batch mode (#16202) (#16204)
1ac6984eb6b is described below

commit 1ac6984eb6b5cbab0ac4c092b9bf7681a9ec1959
Author: VGalaxies <[email protected]>
AuthorDate: Tue Aug 19 16:32:46 2025 +0800

    [To dev/1.3] Pipe CI: always flush for 
IoTDBPipeDataSinkIT.testReceiverAutoCreate with batch mode (#16202) (#16204)
    
    * Pipe CI: always flush for IoTDBPipeDataSinkIT.testReceiverAutoCreate with 
batch mode
    
    * fixup
---
 .../org/apache/iotdb/db/it/utils/TestUtils.java    | 48 ++++++++++++++++++++++
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    | 13 +++++-
 2 files changed, 59 insertions(+), 2 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index a793be1bf3a..168914651ed 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
 import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
@@ -694,6 +695,15 @@ public class TestUtils {
     assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600);
   }
 
+  public static void assertDataEventuallyOnEnv(
+      final BaseEnv env,
+      final String sql,
+      final String expectedHeader,
+      final Set<String> expectedResSet,
+      final Consumer<String> handleFailure) {
+    assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600, 
handleFailure);
+  }
+
   public static void assertDataEventuallyOnEnv(
       BaseEnv env,
       String sql,
@@ -723,6 +733,44 @@ public class TestUtils {
     }
   }
 
+  public static void assertDataEventuallyOnEnv(
+      final BaseEnv env,
+      final String sql,
+      final String expectedHeader,
+      final Set<String> expectedResSet,
+      final long timeoutSeconds,
+      final Consumer<String> handleFailure) {
+    try (Connection connection = env.getConnection();
+        Statement statement = connection.createStatement()) {
+      // Keep retrying if there are execution failures
+      await()
+          .pollInSameThread()
+          .pollDelay(1L, TimeUnit.SECONDS)
+          .pollInterval(1L, TimeUnit.SECONDS)
+          .atMost(timeoutSeconds, TimeUnit.SECONDS)
+          .untilAsserted(
+              () -> {
+                try {
+                  TestUtils.assertResultSetEqual(
+                      executeQueryWithRetry(statement, sql), expectedHeader, 
expectedResSet);
+                } catch (Exception e) {
+                  if (handleFailure != null) {
+                    handleFailure.accept(e.getMessage());
+                  }
+                  Assert.fail();
+                } catch (Error e) {
+                  if (handleFailure != null) {
+                    handleFailure.accept(e.getMessage());
+                  }
+                  throw e;
+                }
+              });
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+  }
+
   public static void assertDataEventuallyOnEnv(
       final BaseEnv env,
       final DataNodeWrapper dataNodeWrapper,
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 9a8ec2fc18e..5f333931117 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.function.Consumer;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
@@ -280,6 +281,12 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
 
   private void testReceiverAutoCreate(final Map<String, String> 
extractorAttributes)
       throws Exception {
+    final Consumer<String> handleFailure =
+        o -> {
+          TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+          TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+        };
+
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     final String receiverIp = receiverDataNode.getIp();
@@ -343,12 +350,14 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
                       
"root.ln.wf01.wt01.date,null,root.ln,DATE,TS_2DIFF,LZ4,null,null,null,null,BASE,",
                       
"root.ln.wf01.wt01.text,null,root.ln,TEXT,PLAIN,LZ4,null,null,null,null,BASE,",
                       
"root.ln.wf01.wt01.string,null,root.ln,STRING,PLAIN,LZ4,null,null,null,null,BASE,",
-                      
"root.ln.wf01.wt01.blob,null,root.ln,BLOB,PLAIN,LZ4,null,null,null,null,BASE,"))));
+                      
"root.ln.wf01.wt01.blob,null,root.ln,BLOB,PLAIN,LZ4,null,null,null,null,BASE,"))),
+          handleFailure);
       TestUtils.assertDataEventuallyOnEnv(
           receiverEnv,
           "show devices root.ln.wf01.wt02",
           "Device,IsAligned,Template,TTL(ms),",
-          Collections.singleton("root.ln.wf01.wt02,true,null,INF,"));
+          Collections.singleton("root.ln.wf01.wt02,true,null,INF,"),
+          handleFailure);
     }
   }
 

Reply via email to