This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7bfea0038a0 [To dev/1.3] Subscription: apply IoTConsensusV2 as cluster 
mode for integration test (#15546) (#15852)
7bfea0038a0 is described below

commit 7bfea0038a09e4b00fce47bd381756790a70bce2
Author: VGalaxies <vgalax...@apache.org>
AuthorDate: Thu Jul 3 15:20:37 2025 +0800

    [To dev/1.3] Subscription: apply IoTConsensusV2 as cluster mode for 
integration test (#15546) (#15852)
    
    * Subscription: apply IoTConsensusV2 as cluster mode for integration test 
(#15546)
    
    * fixup! Subscription: apply IoTConsensusV2 as cluster mode for integration 
test (#15546)
---
 .../it/IoTDBSubscriptionITConstant.java            | 28 ++++++++
 .../AbstractSubscriptionRegressionIT.java          | 80 ++++++++++++++++------
 .../IoTDBDefaultTsfilePushConsumerIT.java          | 11 +++
 .../IoTDBRootPullConsumeTsfileIT.java              | 10 +++
 .../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 10 +++
 .../IoTDBDevicePatternPullConsumerDataSetIT.java   | 31 ++++++---
 ...IoTDBMiddleMatchPatternPullConsumeTsfileIT.java | 28 ++++----
 ...oTDBSnapshotTSPatternDatasetPushConsumerIT.java | 14 +++-
 .../multi/IoTDBMultiGroupVsMultiConsumerIT.java    | 10 +++
 9 files changed, 178 insertions(+), 44 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
index 5b8ec393274..3162139fb66 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java
@@ -19,10 +19,15 @@
 
 package org.apache.iotdb.subscription.it;
 
+import org.apache.iotdb.consensus.ConsensusFactory;
+import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.session.Session;
+
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionFactory;
 
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 public class IoTDBSubscriptionITConstant {
 
@@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant {
 
   public static final long SLEEP_NS = 1_000_000_000L;
   public static final long POLL_TIMEOUT_MS = 10_000L;
+
+  @FunctionalInterface
+  public interface WrappedVoidSupplier {
+    void get() throws Throwable;
+  }
+
+  public static void AWAIT_WITH_FLUSH(final Session session, final 
WrappedVoidSupplier assertions) {
+    AWAIT.untilAsserted(
+        () -> {
+          session.executeNonQueryStatement("flush");
+          assertions.get();
+        });
+  }
+
+  public static Consumer<BaseEnv> FORCE_SCALABLE_SINGLE_NODE_MODE =
+      env ->
+          env.getConfig()
+              .getCommonConfig()
+              
.setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+              
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+              
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+              .setSchemaReplicationFactor(1)
+              .setDataReplicationFactor(1);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
index fe32817ad00..0eb5798599d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.session.subscription.SubscriptionSession;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
+import 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier;
 import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT;
 
 import org.apache.thrift.TException;
@@ -57,6 +58,7 @@ import java.util.Objects;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT;
 import static 
org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS;
 
 public abstract class AbstractSubscriptionRegressionIT extends 
AbstractSubscriptionTripleIT {
@@ -359,26 +361,6 @@ public abstract class AbstractSubscriptionRegressionIT 
extends AbstractSubscript
     return results;
   }
 
-  public static void consume_data_long(
-      SubscriptionPullConsumer consumer, Session session, Long timeout)
-      throws StatementExecutionException, InterruptedException, 
IoTDBConnectionException {
-    timeout = System.currentTimeMillis() + timeout;
-    while (System.currentTimeMillis() < timeout) {
-      List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
-      if (messages.isEmpty()) {
-        Thread.sleep(1000);
-      }
-      for (final SubscriptionMessage message : messages) {
-        for (final Iterator<Tablet> it = 
message.getSessionDataSetsHandler().tabletIterator();
-            it.hasNext(); ) {
-          final Tablet tablet = it.next();
-          session.insertTablet(tablet);
-        }
-      }
-      consumer.commitSync(messages);
-    }
-  }
-
   public void consume_data(SubscriptionPullConsumer consumer)
       throws TException,
           IOException,
@@ -388,6 +370,64 @@ public abstract class AbstractSubscriptionRegressionIT 
extends AbstractSubscript
     consume_data(consumer, session_dest);
   }
 
+  public void consume_data_await(
+      SubscriptionPullConsumer consumer, Session session, 
List<WrappedVoidSupplier> assertions) {
+    AWAIT.untilAsserted(
+        () -> {
+          List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+          if (messages.isEmpty()) {
+            session_src.executeNonQueryStatement("flush");
+          }
+          for (final SubscriptionMessage message : messages) {
+            for (final Iterator<Tablet> it = 
message.getSessionDataSetsHandler().tabletIterator();
+                it.hasNext(); ) {
+              final Tablet tablet = it.next();
+              session.insertTablet(tablet);
+            }
+          }
+          consumer.commitSync(messages);
+          for (final WrappedVoidSupplier assertion : assertions) {
+            assertion.get();
+          }
+        });
+  }
+
+  public void consume_tsfile_await(
+      SubscriptionPullConsumer consumer, List<String> devices, List<Integer> 
expected) {
+    final List<AtomicInteger> counters = new ArrayList<>(devices.size());
+    for (int i = 0; i < devices.size(); i++) {
+      counters.add(new AtomicInteger(0));
+    }
+    AWAIT.untilAsserted(
+        () -> {
+          List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS));
+          if (messages.isEmpty()) {
+            session_src.executeNonQueryStatement("flush");
+          }
+          for (final SubscriptionMessage message : messages) {
+            final SubscriptionTsFileHandler tsFileHandler = 
message.getTsFileHandler();
+            try (final TsFileReader tsFileReader = tsFileHandler.openReader()) 
{
+              for (int i = 0; i < devices.size(); i++) {
+                final Path path = new Path(devices.get(i), "s_0", true);
+                final QueryDataSet dataSet =
+                    tsFileReader.query(
+                        
QueryExpression.create(Collections.singletonList(path), null));
+                while (dataSet.hasNext()) {
+                  dataSet.next();
+                  counters.get(i).addAndGet(1);
+                }
+              }
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          consumer.commitSync(messages);
+          for (int i = 0; i < devices.size(); i++) {
+            assertEquals(counters.get(i).get(), expected.get(i));
+          }
+        });
+  }
+
   //////////////////////////// strict assertions ////////////////////////////
 
   public static void assertEquals(int actual, int expected) {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
index e56e8cef24d..ac3ee07bd7f 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.AckStrategy;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -80,6 +81,15 @@ public class IoTDBDefaultTsfilePushConsumerIT extends 
AbstractSubscriptionRegres
     }
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -106,6 +116,7 @@ public class IoTDBDefaultTsfilePushConsumerIT extends 
AbstractSubscriptionRegres
       timestamp += 2000;
     }
     session_src.insertTablet(tablet);
+    session_src.executeNonQueryStatement("flush");
   }
 
   @Test
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
index f513e66ea2b..6106a392c66 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMis
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -68,6 +69,15 @@ public class IoTDBRootPullConsumeTsfileIT extends 
AbstractSubscriptionRegression
     session_src.executeNonQueryStatement("create database 
root.RootPullConsumeTsfile");
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
index 8ae086e6151..a28f617f32d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionCon
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -82,6 +83,15 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends 
AbstractSubscriptionReg
     assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2");
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
index 1e6a59a7ace..3dbb923baed 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java
@@ -40,6 +40,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 @RunWith(IoTDBTestRunner.class)
@@ -114,6 +115,7 @@ public class IoTDBDevicePatternPullConsumerDataSetIT 
extends AbstractSubscriptio
       timestamp += row * 2000;
     }
     session_src.insertTablet(tablet);
+    session_src.executeNonQueryStatement("flush");
   }
 
   @Test
@@ -131,13 +133,18 @@ public class IoTDBDevicePatternPullConsumerDataSetIT 
extends AbstractSubscriptio
     assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after 
subscription");
     insert_data(System.currentTimeMillis() - 30000L);
     // Consumption data
-    consume_data(consumer, session_dest);
     String sql = "select count(s_0) from " + device;
-    System.out.println("src: " + getCount(session_src, sql));
-    check_count(8, sql, "Consumption data:" + pattern);
-    check_count(8, "select count(s_1) from " + device, "Consumption data: 
s_1");
-    check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption 
data:d_1");
-    check_count(0, "select count(s_0) from " + device2, "Consumption 
data:d_2");
+    consume_data_await(
+        consumer,
+        session_dest,
+        Collections.singletonList(
+            () -> {
+              System.out.println("src: " + getCount(session_src, sql));
+              check_count(8, sql, "Consumption data:" + pattern);
+              check_count(8, "select count(s_1) from " + device, "Consumption 
data: s_1");
+              check_count(0, "select count(s_0) from " + database + ".d_1", 
"Consumption data:d_1");
+              check_count(0, "select count(s_0) from " + device2, "Consumption 
data:d_2");
+            }));
     insert_data(System.currentTimeMillis());
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -148,8 +155,14 @@ public class IoTDBDevicePatternPullConsumerDataSetIT 
extends AbstractSubscriptio
     System.out.println("src: " + getCount(session_src, sql));
     // Consumption data: Progress is not retained after unsubscribing and then 
re-subscribing. Full
     // synchronization.
-    consume_data(consumer, session_dest);
-    check_count(12, "select count(s_0) from " + device, "consume data 
again:s_0");
-    check_count(12, "select count(s_1) from " + device, "Consumption data: 
s_1");
+    consume_data_await(
+        consumer,
+        session_dest,
+        Collections.singletonList(
+            () -> {
+              System.out.println("src: " + getCount(session_src, sql));
+              check_count(12, "select count(s_0) from " + device, "consume 
data again:s_0");
+              check_count(12, "select count(s_1) from " + device, "Consumption 
data: s_1");
+            }));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
index 291f666b9ea..e667d8212c2 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionCon
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -40,6 +41,7 @@ import org.junit.runner.RunWith;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 /***
@@ -92,6 +94,16 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT 
extends AbstractSubscrip
     assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
   }
 
+  // TODO: remove it later
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -149,11 +161,7 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT 
extends AbstractSubscrip
     devices.add(device);
     devices.add(device2);
     devices.add(database2 + ".d_2");
-
-    List<Integer> rowCounts = consume_tsfile(consumer, devices);
-    assertEquals(rowCounts.get(0), 10);
-    assertEquals(rowCounts.get(1), 1);
-    assertEquals(rowCounts.get(2), 1);
+    consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1));
     // Unsubscribe
     consumer.unsubscribe(topicName);
     assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after 
cancellation");
@@ -163,14 +171,6 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT 
extends AbstractSubscrip
     insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00
     // Consumption data: Progress is not retained after canceling and 
re-subscribing. Full
     // synchronization.
-    rowCounts = consume_tsfile(consumer, devices);
-
-    assertEquals(
-        rowCounts.get(0),
-        15,
-        "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
-    assertEquals(
-        rowCounts.get(1), 1, "Cancel subscription and subscribe again," + 
database + ".d_1");
-    assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," + 
database2 + ".d_2");
+    consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1));
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
index ea4bc9d811f..f19f0594537 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java
@@ -28,6 +28,7 @@ import 
org.apache.iotdb.session.subscription.consumer.AckStrategy;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import org.apache.iotdb.subscription.it.Retry;
 import org.apache.iotdb.subscription.it.RetryRule;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
@@ -104,6 +105,16 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT 
extends AbstractSubscri
     assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics");
   }
 
+  // TODO: remove it later
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
@@ -194,7 +205,8 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT 
extends AbstractSubscri
 
     // Consumption data: Progress is not retained when re-subscribing after 
cancellation. Full
     // synchronization.
-    AWAIT.untilAsserted(
+    IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH(
+        session_src,
         () -> {
           check_count(12, "select count(s_0) from " + device, "consume data 
again:s_0 " + device);
           check_count(0, "select count(s_1) from " + device, "Consumption 
data: s_1 " + device);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
index 0751e922820..af39d745933 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 import 
org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT;
 
 import org.apache.thrift.TException;
@@ -121,6 +122,15 @@ public class IoTDBMultiGroupVsMultiConsumerIT extends 
AbstractSubscriptionRegres
     subs.getTopics().forEach(System.out::println);
   }
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1);
+    
IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2);
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {

Reply via email to