This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch fix-pipe-stuck-by-file
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/fix-pipe-stuck-by-file by this
push:
new 3e43a341244 Subscription IT: flush after realtime data insertion in
IoTDBDefaultPullConsumerDataSetIT (#15089)
3e43a341244 is described below
commit 3e43a34124425f18399e5eefdd75747ef3240d0a
Author: VGalaxies <[email protected]>
AuthorDate: Fri Mar 14 01:10:31 2025 +0800
Subscription IT: flush after realtime data insertion in
IoTDBDefaultPullConsumerDataSetIT (#15089)
---
.../IoTDBDefaultPullConsumerDataSetIT.java | 30 ++++++++++++++--------
1 file changed, 20 insertions(+), 10 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
index fff68412db4..0b36deed5eb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultPullConsumerDataSetIT.java
@@ -32,7 +32,6 @@ import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
@@ -42,6 +41,8 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import static
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
+
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2SubscriptionRegressionMisc.class})
public class IoTDBDefaultPullConsumerDataSetIT extends
AbstractSubscriptionRegressionIT {
@@ -92,7 +93,6 @@ public class IoTDBDefaultPullConsumerDataSetIT extends
AbstractSubscriptionRegre
session_src.insertTablet(tablet);
}
- @Ignore
@Test
public void do_test()
throws InterruptedException,
@@ -119,10 +119,15 @@ public class IoTDBDefaultPullConsumerDataSetIT extends
AbstractSubscriptionRegre
String sql = "select count(s_0) from " + databasePrefix + "0.d_0";
System.out.println(FORMAT.format(new Date()) + " src: " +
getCount(session_src, sql));
// Consumption data
- consume_data(consumer, session_dest);
- for (int i = 0; i < deviceCount; i++) {
- check_count(10, "select count(s_0) from " + devices.get(i), i +
":Consumption Data:s_0");
- }
+ AWAIT.untilAsserted(
+ () -> {
+ session_src.executeNonQueryStatement("flush");
+ consume_data(consumer, session_dest);
+ for (int i = 0; i < deviceCount; i++) {
+ check_count(
+ 10, "select count(s_0) from " + devices.get(i), i +
":Consumption Data:s_0");
+ }
+ });
// Unsubscribe
consumer.unsubscribe(topicName);
// Unsubscribe and then write data
@@ -135,9 +140,14 @@ public class IoTDBDefaultPullConsumerDataSetIT extends
AbstractSubscriptionRegre
System.out.println(FORMAT.format(new Date()) + " src: " +
getCount(session_src, sql));
// Consumption data: Progress is not retained when re-subscribing after
cancellation. Full
// synchronization.
- consume_data(consumer, session_dest);
- for (int i = 0; i < deviceCount; i++) {
- check_count(15, "select count(s_0) from " + devices.get(i), i +
":consume data again:s_0");
- }
+ AWAIT.untilAsserted(
+ () -> {
+ session_src.executeNonQueryStatement("flush");
+ consume_data(consumer, session_dest);
+ for (int i = 0; i < deviceCount; i++) {
+ check_count(
+ 15, "select count(s_0) from " + devices.get(i), i + ":consume
data again:s_0");
+ }
+ });
}
}