This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1ac6984eb6b [To dev/1.3] Pipe CI: always flush for
IoTDBPipeDataSinkIT.testReceiverAutoCreate with batch mode (#16202) (#16204)
1ac6984eb6b is described below
commit 1ac6984eb6b5cbab0ac4c092b9bf7681a9ec1959
Author: VGalaxies <[email protected]>
AuthorDate: Tue Aug 19 16:32:46 2025 +0800
[To dev/1.3] Pipe CI: always flush for
IoTDBPipeDataSinkIT.testReceiverAutoCreate with batch mode (#16202) (#16204)
* Pipe CI: always flush for IoTDBPipeDataSinkIT.testReceiverAutoCreate with
batch mode
* fixup
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 48 ++++++++++++++++++++++
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 13 +++++-
2 files changed, 59 insertions(+), 2 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index a793be1bf3a..168914651ed 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -48,6 +48,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import static org.apache.iotdb.itbase.constant.TestConstant.DELTA;
import static org.apache.iotdb.itbase.constant.TestConstant.NULL;
@@ -694,6 +695,15 @@ public class TestUtils {
assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600);
}
+ public static void assertDataEventuallyOnEnv(
+ final BaseEnv env,
+ final String sql,
+ final String expectedHeader,
+ final Set<String> expectedResSet,
+ final Consumer<String> handleFailure) {
+ assertDataEventuallyOnEnv(env, sql, expectedHeader, expectedResSet, 600,
handleFailure);
+ }
+
public static void assertDataEventuallyOnEnv(
BaseEnv env,
String sql,
@@ -723,6 +733,44 @@ public class TestUtils {
}
}
+ public static void assertDataEventuallyOnEnv(
+ final BaseEnv env,
+ final String sql,
+ final String expectedHeader,
+ final Set<String> expectedResSet,
+ final long timeoutSeconds,
+ final Consumer<String> handleFailure) {
+ try (Connection connection = env.getConnection();
+ Statement statement = connection.createStatement()) {
+ // Keep retrying if there are execution failures
+ await()
+ .pollInSameThread()
+ .pollDelay(1L, TimeUnit.SECONDS)
+ .pollInterval(1L, TimeUnit.SECONDS)
+ .atMost(timeoutSeconds, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ try {
+ TestUtils.assertResultSetEqual(
+ executeQueryWithRetry(statement, sql), expectedHeader,
expectedResSet);
+ } catch (Exception e) {
+ if (handleFailure != null) {
+ handleFailure.accept(e.getMessage());
+ }
+ Assert.fail();
+ } catch (Error e) {
+ if (handleFailure != null) {
+ handleFailure.accept(e.getMessage());
+ }
+ throw e;
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+
public static void assertDataEventuallyOnEnv(
final BaseEnv env,
final DataNodeWrapper dataNodeWrapper,
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 9a8ec2fc18e..5f333931117 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -38,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.function.Consumer;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
@@ -280,6 +281,12 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
private void testReceiverAutoCreate(final Map<String, String>
extractorAttributes)
throws Exception {
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -343,12 +350,14 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
"root.ln.wf01.wt01.date,null,root.ln,DATE,TS_2DIFF,LZ4,null,null,null,null,BASE,",
"root.ln.wf01.wt01.text,null,root.ln,TEXT,PLAIN,LZ4,null,null,null,null,BASE,",
"root.ln.wf01.wt01.string,null,root.ln,STRING,PLAIN,LZ4,null,null,null,null,BASE,",
-
"root.ln.wf01.wt01.blob,null,root.ln,BLOB,PLAIN,LZ4,null,null,null,null,BASE,"))));
+
"root.ln.wf01.wt01.blob,null,root.ln,BLOB,PLAIN,LZ4,null,null,null,null,BASE,"))),
+ handleFailure);
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"show devices root.ln.wf01.wt02",
"Device,IsAligned,Template,TTL(ms),",
- Collections.singleton("root.ln.wf01.wt02,true,null,INF,"));
+ Collections.singleton("root.ln.wf01.wt02,true,null,INF,"),
+ handleFailure);
}
}