This is an automated email from the ASF dual-hosted git repository. justinchen pushed a commit to branch dev/1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push: new 7bfea0038a0 [To dev/1.3] Subscription: apply IoTConsensusV2 as cluster mode for integration test (#15546) (#15852) 7bfea0038a0 is described below commit 7bfea0038a09e4b00fce47bd381756790a70bce2 Author: VGalaxies <vgalax...@apache.org> AuthorDate: Thu Jul 3 15:20:37 2025 +0800 [To dev/1.3] Subscription: apply IoTConsensusV2 as cluster mode for integration test (#15546) (#15852) * Subscription: apply IoTConsensusV2 as cluster mode for integration test (#15546) * fixup! Subscription: apply IoTConsensusV2 as cluster mode for integration test (#15546) --- .../it/IoTDBSubscriptionITConstant.java | 28 ++++++++ .../AbstractSubscriptionRegressionIT.java | 80 ++++++++++++++++------ .../IoTDBDefaultTsfilePushConsumerIT.java | 11 +++ .../IoTDBRootPullConsumeTsfileIT.java | 10 +++ .../multi/IoTDBOneConsumerMultiTopicsTsfileIT.java | 10 +++ .../IoTDBDevicePatternPullConsumerDataSetIT.java | 31 ++++++--- ...IoTDBMiddleMatchPatternPullConsumeTsfileIT.java | 28 ++++---- ...oTDBSnapshotTSPatternDatasetPushConsumerIT.java | 14 +++- .../multi/IoTDBMultiGroupVsMultiConsumerIT.java | 10 +++ 9 files changed, 178 insertions(+), 44 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java index 5b8ec393274..3162139fb66 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionITConstant.java @@ -19,10 +19,15 @@ package org.apache.iotdb.subscription.it; +import org.apache.iotdb.consensus.ConsensusFactory; +import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.session.Session; + import org.awaitility.Awaitility; import org.awaitility.core.ConditionFactory; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; public class IoTDBSubscriptionITConstant { @@ -40,4 +45,27 @@ public class IoTDBSubscriptionITConstant { public static final long SLEEP_NS = 1_000_000_000L; public static final long POLL_TIMEOUT_MS = 10_000L; + + @FunctionalInterface + public interface WrappedVoidSupplier { + void get() throws Throwable; + } + + public static void AWAIT_WITH_FLUSH(final Session session, final WrappedVoidSupplier assertions) { + AWAIT.untilAsserted( + () -> { + session.executeNonQueryStatement("flush"); + assertions.get(); + }); + } + + public static Consumer<BaseEnv> FORCE_SCALABLE_SINGLE_NODE_MODE = + env -> + env.getConfig() + .getCommonConfig() + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS) + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1); } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java index fe32817ad00..0eb5798599d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/AbstractSubscriptionRegressionIT.java @@ -29,6 +29,7 @@ import org.apache.iotdb.session.subscription.SubscriptionSession; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessage; import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.WrappedVoidSupplier; import org.apache.iotdb.subscription.it.triple.AbstractSubscriptionTripleIT; import org.apache.thrift.TException; @@ -57,6 +58,7 @@ import java.util.Objects; import java.util.Properties; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.AWAIT; import static org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS; public abstract class AbstractSubscriptionRegressionIT extends AbstractSubscriptionTripleIT { @@ -359,26 +361,6 @@ public abstract class AbstractSubscriptionRegressionIT extends AbstractSubscript return results; } - public static void consume_data_long( - SubscriptionPullConsumer consumer, Session session, Long timeout) - throws StatementExecutionException, InterruptedException, IoTDBConnectionException { - timeout = System.currentTimeMillis() + timeout; - while (System.currentTimeMillis() < timeout) { - List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); - if (messages.isEmpty()) { - Thread.sleep(1000); - } - for (final SubscriptionMessage message : messages) { - for (final Iterator<Tablet> it = message.getSessionDataSetsHandler().tabletIterator(); - it.hasNext(); ) { - final Tablet tablet = it.next(); - session.insertTablet(tablet); - } - } - consumer.commitSync(messages); - } - } - public void consume_data(SubscriptionPullConsumer consumer) throws TException, IOException, @@ -388,6 +370,64 @@ public abstract class AbstractSubscriptionRegressionIT extends AbstractSubscript consume_data(consumer, session_dest); } + public void consume_data_await( + SubscriptionPullConsumer consumer, Session session, List<WrappedVoidSupplier> assertions) { + AWAIT.untilAsserted( + () -> { + List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + if (messages.isEmpty()) { + session_src.executeNonQueryStatement("flush"); + } + for (final SubscriptionMessage message : messages) { + for (final Iterator<Tablet> it = message.getSessionDataSetsHandler().tabletIterator(); + it.hasNext(); ) { + final Tablet tablet = it.next(); + session.insertTablet(tablet); + } + } + consumer.commitSync(messages); + for (final WrappedVoidSupplier assertion : assertions) { + assertion.get(); + } + }); + } + + public void consume_tsfile_await( + SubscriptionPullConsumer consumer, List<String> devices, List<Integer> expected) { + final List<AtomicInteger> counters = new ArrayList<>(devices.size()); + for (int i = 0; i < devices.size(); i++) { + counters.add(new AtomicInteger(0)); + } + AWAIT.untilAsserted( + () -> { + List<SubscriptionMessage> messages = consumer.poll(Duration.ofMillis(POLL_TIMEOUT_MS)); + if (messages.isEmpty()) { + session_src.executeNonQueryStatement("flush"); + } + for (final SubscriptionMessage message : messages) { + final SubscriptionTsFileHandler tsFileHandler = message.getTsFileHandler(); + try (final TsFileReader tsFileReader = tsFileHandler.openReader()) { + for (int i = 0; i < devices.size(); i++) { + final Path path = new Path(devices.get(i), "s_0", true); + final QueryDataSet dataSet = + tsFileReader.query( + QueryExpression.create(Collections.singletonList(path), null)); + while (dataSet.hasNext()) { + dataSet.next(); + counters.get(i).addAndGet(1); + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + consumer.commitSync(messages); + for (int i = 0; i < devices.size(); i++) { + assertEquals(counters.get(i).get(), expected.get(i)); + } + }); + } + //////////////////////////// strict assertions //////////////////////////// public static void assertEquals(int actual, int expected) { diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java index e56e8cef24d..ac3ee07bd7f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBDefaultTsfilePushConsumerIT.java @@ -26,6 +26,7 @@ import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.subscription.consumer.AckStrategy; import org.apache.iotdb.session.subscription.consumer.ConsumeResult; import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT; import org.apache.thrift.TException; @@ -80,6 +81,15 @@ public class IoTDBDefaultTsfilePushConsumerIT extends AbstractSubscriptionRegres } } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { @@ -106,6 +116,7 @@ public class IoTDBDefaultTsfilePushConsumerIT extends AbstractSubscriptionRegres timestamp += 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java index f513e66ea2b..6106a392c66 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/auto_create_db/IoTDBRootPullConsumeTsfileIT.java @@ -24,6 +24,7 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionMis import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT; import org.apache.thrift.TException; @@ -68,6 +69,15 @@ public class IoTDBRootPullConsumeTsfileIT extends AbstractSubscriptionRegression session_src.executeNonQueryStatement("create database root.RootPullConsumeTsfile"); } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java index 8ae086e6151..a28f617f32d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/multi/IoTDBOneConsumerMultiTopicsTsfileIT.java @@ -24,6 +24,7 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionCon import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT; import org.apache.thrift.TException; @@ -82,6 +83,15 @@ public class IoTDBOneConsumerMultiTopicsTsfileIT extends AbstractSubscriptionReg assertTrue(subs.getTopic(topicName2).isPresent(), "Create show topics 2"); } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java index 1e6a59a7ace..3dbb923baed 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBDevicePatternPullConsumerDataSetIT.java @@ -40,6 +40,7 @@ import org.junit.runner.RunWith; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; @RunWith(IoTDBTestRunner.class) @@ -114,6 +115,7 @@ public class IoTDBDevicePatternPullConsumerDataSetIT extends AbstractSubscriptio timestamp += row * 2000; } session_src.insertTablet(tablet); + session_src.executeNonQueryStatement("flush"); } @Test @@ -131,13 +133,18 @@ public class IoTDBDevicePatternPullConsumerDataSetIT extends AbstractSubscriptio assertEquals(subs.getSubscriptions().size(), 1, "show subscriptions after subscription"); insert_data(System.currentTimeMillis() - 30000L); // Consumption data - consume_data(consumer, session_dest); String sql = "select count(s_0) from " + device; - System.out.println("src: " + getCount(session_src, sql)); - check_count(8, sql, "Consumption data:" + pattern); - check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); - check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1"); - check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2"); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + System.out.println("src: " + getCount(session_src, sql)); + check_count(8, sql, "Consumption data:" + pattern); + check_count(8, "select count(s_1) from " + device, "Consumption data: s_1"); + check_count(0, "select count(s_0) from " + database + ".d_1", "Consumption data:d_1"); + check_count(0, "select count(s_0) from " + device2, "Consumption data:d_2"); + })); insert_data(System.currentTimeMillis()); // Unsubscribe consumer.unsubscribe(topicName); @@ -148,8 +155,14 @@ public class IoTDBDevicePatternPullConsumerDataSetIT extends AbstractSubscriptio System.out.println("src: " + getCount(session_src, sql)); // Consumption data: Progress is not retained after unsubscribing and then re-subscribing. Full // synchronization. - consume_data(consumer, session_dest); - check_count(12, "select count(s_0) from " + device, "consume data again:s_0"); - check_count(12, "select count(s_1) from " + device, "Consumption data: s_1"); + consume_data_await( + consumer, + session_dest, + Collections.singletonList( + () -> { + System.out.println("src: " + getCount(session_src, sql)); + check_count(12, "select count(s_0) from " + device, "consume data again:s_0"); + check_count(12, "select count(s_1) from " + device, "Consumption data: s_1"); + })); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java index 291f666b9ea..e667d8212c2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBMiddleMatchPatternPullConsumeTsfileIT.java @@ -24,6 +24,7 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2SubscriptionRegressionCon import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT; import org.apache.thrift.TException; @@ -40,6 +41,7 @@ import org.junit.runner.RunWith; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /*** @@ -92,6 +94,16 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT extends AbstractSubscrip assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics"); } + // TODO: remove it later + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { @@ -149,11 +161,7 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT extends AbstractSubscrip devices.add(device); devices.add(device2); devices.add(database2 + ".d_2"); - - List<Integer> rowCounts = consume_tsfile(consumer, devices); - assertEquals(rowCounts.get(0), 10); - assertEquals(rowCounts.get(1), 1); - assertEquals(rowCounts.get(2), 1); + consume_tsfile_await(consumer, devices, Arrays.asList(10, 1, 1)); // Unsubscribe consumer.unsubscribe(topicName); assertEquals(subs.getSubscriptions().size(), 0, "Show subscriptions after cancellation"); @@ -163,14 +171,6 @@ public class IoTDBMiddleMatchPatternPullConsumeTsfileIT extends AbstractSubscrip insert_data(1707782400000L); // 2024-02-13 08:00:00+08:00 // Consumption data: Progress is not retained after canceling and re-subscribing. Full // synchronization. - rowCounts = consume_tsfile(consumer, devices); - - assertEquals( - rowCounts.get(0), - 15, - "Unsubscribe and resubscribe, progress is not retained. Full synchronization."); - assertEquals( - rowCounts.get(1), 1, "Cancel subscription and subscribe again," + database + ".d_1"); - assertEquals(rowCounts.get(2), 1, "Unsubscribe and resubscribe," + database2 + ".d_2"); + consume_tsfile_await(consumer, devices, Arrays.asList(15, 1, 1)); } } diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java index ea4bc9d811f..f19f0594537 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/mode/IoTDBSnapshotTSPatternDatasetPushConsumerIT.java @@ -28,6 +28,7 @@ import org.apache.iotdb.session.subscription.consumer.AckStrategy; import org.apache.iotdb.session.subscription.consumer.ConsumeResult; import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.Retry; import org.apache.iotdb.subscription.it.RetryRule; import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT; @@ -104,6 +105,16 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT extends AbstractSubscri assertTrue(subs.getTopic(topicName).isPresent(), "Create show topics"); } + // TODO: remove it later + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception { @@ -194,7 +205,8 @@ public class IoTDBSnapshotTSPatternDatasetPushConsumerIT extends AbstractSubscri // Consumption data: Progress is not retained when re-subscribing after cancellation. Full // synchronization. - AWAIT.untilAsserted( + IoTDBSubscriptionITConstant.AWAIT_WITH_FLUSH( + session_src, () -> { check_count(12, "select count(s_0) from " + device, "consume data again:s_0 " + device); check_count(0, "select count(s_1) from " + device, "Consumption data: s_1 " + device); diff --git a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java index 0751e922820..af39d745933 100644 --- a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/multi/IoTDBMultiGroupVsMultiConsumerIT.java @@ -27,6 +27,7 @@ import org.apache.iotdb.session.subscription.consumer.ConsumeResult; import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer; import org.apache.iotdb.session.subscription.payload.SubscriptionMessageType; import org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet; +import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant; import org.apache.iotdb.subscription.it.triple.regression.AbstractSubscriptionRegressionIT; import org.apache.thrift.TException; @@ -121,6 +122,15 @@ public class IoTDBMultiGroupVsMultiConsumerIT extends AbstractSubscriptionRegres subs.getTopics().forEach(System.out::println); } + @Override + protected void setUpConfig() { + super.setUpConfig(); + + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(sender); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver1); + IoTDBSubscriptionITConstant.FORCE_SCALABLE_SINGLE_NODE_MODE.accept(receiver2); + } + @Override @After public void tearDown() throws Exception {