This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cd6ba732a42 Subscription: fix some issues on DN and session & improve
IT (#12254)
cd6ba732a42 is described below
commit cd6ba732a428585d9036121c97ca272b0fc2d3c5
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Mar 29 23:21:41 2024 +0800
Subscription: fix some issues on DN and session & improve IT (#12254)
---
.../it/{ => dual}/AbstractSubscriptionDualIT.java | 2 +-
.../IoTDBSubscriptionConsumerGroupIT.java | 58 +++---
.../it/{ => dual}/IoTDBSubscriptionTopicIT.java | 113 +++++------
.../it/local/IoTDBSubscriptionBasicIT.java | 157 +++++++++++++++
.../it/local/IoTDBSubscriptionIdempotentIT.java | 162 ++++++++++++++++
.../IoTDBSubscriptionRestartIT.java} | 212 +++++++++------------
.../session/subscription/SubscriptionSession.java | 3 +-
.../broker/SerializedEnrichedEvent.java | 2 +-
.../apache/iotdb/commons/conf/CommonConfig.java | 14 +-
.../iotdb/commons/conf/CommonDescriptor.java | 6 +-
.../subscription/config/SubscriptionConfig.java | 8 +-
11 files changed, 515 insertions(+), 222 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
similarity index 97%
rename from
integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionDualIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 46e537cd7f6..6f2dbd41230 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionDualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.dual;
import org.apache.iotdb.it.env.MultiEnvFactory;
import org.apache.iotdb.itbase.env.BaseEnv;
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
similarity index 93%
rename from
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 63059492eb0..54d3867c99c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.dual;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -68,7 +68,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
public void test3C1CGSubscribeOneTopicHistoricalData() throws Exception {
final long currentTime = System.currentTimeMillis();
- // history data
+ // Historical data
insertData(currentTime);
createTopics(currentTime);
@@ -93,7 +93,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
public void test3C3CGSubscribeOneTopicHistoricalData() throws Exception {
final long currentTime = System.currentTimeMillis();
- // history data
+ // Historical data
insertData(currentTime);
createTopics(currentTime);
@@ -120,7 +120,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
public void test3C1CGSubscribeTwoTopicHistoricalData() throws Exception {
final long currentTime = System.currentTimeMillis();
- // history data
+ // Historical data
insertData(currentTime);
createTopics(currentTime);
@@ -146,7 +146,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
public void test3C3CGSubscribeTwoTopicHistoricalData() throws Exception {
final long currentTime = System.currentTimeMillis();
- // history data
+ // Historical data
insertData(currentTime);
createTopics(currentTime);
@@ -174,7 +174,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
public void test4C2CGSubscribeTwoTopicHistoricalData() throws Exception {
final long currentTime = System.currentTimeMillis();
- // history data
+ // Historical data
insertData(currentTime);
createTopics(currentTime);
@@ -213,7 +213,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
- // realtime data
+ // Realtime data
insertData(currentTime);
pollMessagesAndCheck(
@@ -320,7 +320,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
- // realtime data
+ // Realtime data
insertData(currentTime);
pollMessagesAndCheck(
@@ -338,22 +338,22 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
/////////////////////////////// utility ///////////////////////////////
- private void createTopics(long currentTime) {
+ private void createTopics(final long currentTime) {
// Create topics on sender
try (final ISession session = senderEnv.getSessionConnection()) {
session.executeNonQueryStatement(
String.format("create topic topic1 with ('end-time'='%s')",
currentTime - 1));
session.executeNonQueryStatement(
String.format("create topic topic2 with ('start-time'='%s')",
currentTime));
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
- private void insertData(long currentTime) {
+ private void insertData(final long currentTime) {
// Insert some data on sender
- try (ISession session = senderEnv.getSessionConnection()) {
+ try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
String.format("insert into root.topic1(time, s) values (%s, 1)",
i)); // topic1
@@ -362,13 +362,13 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
"insert into root.topic2(time, s) values (%s, 1)", currentTime
+ i)); // topic2
}
session.executeNonQueryStatement("flush");
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
- private void createPipes(long currentTime) {
+ private void createPipes(final long currentTime) {
// For sync reference
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
@@ -390,7 +390,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
@@ -415,14 +415,15 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));
Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
- String consumerId, String consumerGroupId, String... topicNames) throws
Exception {
+ final String consumerId, final String consumerGroupId, final String...
topicNames)
+ throws Exception {
final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(senderEnv.getIP())
@@ -437,7 +438,8 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
}
private void pollMessagesAndCheck(
- List<SubscriptionPullConsumer> consumers, Map<String, String>
expectedHeaderWithResult)
+ final List<SubscriptionPullConsumer> consumers,
+ final Map<String, String> expectedHeaderWithResult)
throws Exception {
final AtomicBoolean isClosed = new AtomicBoolean(false);
final AtomicBoolean receiverCrashed = new AtomicBoolean(false);
@@ -447,14 +449,14 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
final int index = i;
final String consumerId = consumers.get(index).getConsumerId();
final String consumerGroupId = consumers.get(index).getConsumerGroupId();
- Thread t =
+ final Thread t =
new Thread(
() -> {
try (final SubscriptionPullConsumer consumer =
consumers.get(index)) {
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
@@ -480,14 +482,15 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
consumer.commitSync(messages);
}
// No need to unsubscribe
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
// Avoid failure
} finally {
- LOGGER.info("consumer {} (group {}) exiting...", consumerId,
consumerGroupId);
+ LOGGER.info(
+ "consumer {} (consumer group {}) exiting...",
consumerId, consumerGroupId);
}
},
- String.format("%s_%s", consumerGroupId, consumerId));
+ String.format("%s_%s", consumerId, consumerGroupId));
t.start();
threads.add(t);
}
@@ -512,12 +515,12 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
expectedHeaderWithResult);
});
}
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
isClosed.set(true);
- for (Thread thread : threads) {
+ for (final Thread thread : threads) {
thread.join();
}
}
@@ -525,7 +528,8 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
/** @return false -> receiver crashed */
private boolean insertRowRecordEnrichedByConsumerGroupId(
- List<String> columnNameList, RowRecord record, String consumerGroupId)
throws Exception {
+ final List<String> columnNameList, final RowRecord record, final String
consumerGroupId)
+ throws Exception {
if (columnNameList.size() != 2) {
LOGGER.warn("unexpected column name list: {}", columnNameList);
throw new Exception("unexpected column name list");
@@ -545,7 +549,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends
AbstractSubscriptionDualIT
return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
} else {
LOGGER.warn("unexpected column name: {}", columnName);
- throw new Exception("unexpected column name list");
+ throw new Exception("unexpected column name");
}
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
similarity index 82%
rename from
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionTopicIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 5c861a6c639..e345c7232a3 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.dual;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
@@ -59,7 +59,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
@Test
public void testTopicPathSubscription() throws Exception {
- // insert some history data on sender
+ // Insert some historical data on sender
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
session.executeNonQueryStatement(
@@ -72,30 +72,30 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
String.format("insert into root.db.t1(time, s1) values (%s, 1)",
i));
}
session.executeNonQueryStatement("flush");
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- // create topic on sender
+ // Create topic on sender
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
- try (SubscriptionSession session = new SubscriptionSession(host, port)) {
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- Properties config = new Properties();
+ final Properties config = new Properties();
config.put(TopicConstant.PATH_KEY, "root.db.*.s");
session.createTopic("topic1", config);
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- // subscribe on sender and insert on receiver
+ // Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
() -> {
- try (SubscriptionPullConsumer consumer =
+ try (final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
.host(host)
.port(port)
@@ -103,48 +103,49 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
.consumerGroupId("cg1")
.autoCommit(false)
.buildPullConsumer();
- ISession session = receiverEnv.getSessionConnection()) {
+ final ISession session = receiverEnv.getSessionConnection())
{
consumer.open();
consumer.subscribe("topic1");
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
break;
}
- List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
+ final List<SubscriptionMessage> messages =
+ consumer.poll(Duration.ofMillis(10000));
if (messages.isEmpty()) {
continue;
}
- for (SubscriptionMessage message : messages) {
- SubscriptionSessionDataSets payload =
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionSessionDataSets payload =
(SubscriptionSessionDataSets) message.getPayload();
- for (Iterator<Tablet> it = payload.tabletIterator();
it.hasNext(); ) {
- Tablet tablet = it.next();
+ for (final Iterator<Tablet> it = payload.tabletIterator();
it.hasNext(); ) {
+ final Tablet tablet = it.next();
session.insertTablet(tablet);
}
}
consumer.commitSync(messages);
}
consumer.unsubscribe("topic1");
- LOGGER.info(
- "consumer {} (group {}) exiting...",
- consumer.getConsumerId(),
- consumer.getConsumerGroupId());
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
- // avoid fail
+ // Avoid fail
+ } finally {
+ LOGGER.info("consumer exiting...");
}
});
thread.start();
- // check data on receiver
+ // Check data on receiver
try {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
- .atMost(100, TimeUnit.SECONDS)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
@@ -156,7 +157,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
}));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
@@ -167,7 +168,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
@Test
public void testTopicTimeSubscription() throws Exception {
- // insert some history data on sender
+ // Insert some historical data on sender
final long currentTime = System.currentTimeMillis();
try (final ISession session = senderEnv.getSessionConnection()) {
for (int i = 0; i < 100; ++i) {
@@ -177,25 +178,25 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
String.format("insert into root.db.d2(time, s) values (%s, 1)",
currentTime + i));
}
session.executeNonQueryStatement("flush");
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- // create topic on sender
+ // Create topic on sender
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
- try (SubscriptionSession session = new SubscriptionSession(host, port)) {
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
- Properties config = new Properties();
+ final Properties config = new Properties();
config.put(TopicConstant.START_TIME_KEY, currentTime);
session.createTopic("topic1", config);
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- // subscribe on sender and insert on receiver
+ // Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
@@ -214,7 +215,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
@@ -233,24 +234,24 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
consumer.commitSync(messages);
}
consumer.unsubscribe("topic1");
- LOGGER.info(
- "consumer {} (group {}) exiting...",
- consumer.getConsumerId(),
- consumer.getConsumerGroupId());
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
- // avoid fail
+ // Avoid fail
+ } finally {
+ LOGGER.info("consumer exiting...");
}
});
thread.start();
- // check data on receiver
+ // Check data on receiver
try {
try (final Connection connection = receiverEnv.getConnection();
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
- .atMost(100, TimeUnit.SECONDS)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertSingleResultSetEqual(
@@ -261,7 +262,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
}
}));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
@@ -277,12 +278,12 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
session.executeNonQueryStatement(
"insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2),
(2000, 3), (2500, 4), (3000, 5)");
session.executeNonQueryStatement("flush");
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- // create topic
+ // Create topic
final String host = senderEnv.getIP();
final int port = Integer.parseInt(senderEnv.getPort());
try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
@@ -292,12 +293,12 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
config.put("processor.tumbling-time.interval-seconds", "1");
config.put("processor.down-sampling.split-file", "true");
session.createTopic("topic1", config);
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- // subscribe on sender and insert on receiver
+ // Subscribe on sender and insert on receiver
final AtomicBoolean isClosed = new AtomicBoolean(false);
final Thread thread =
new Thread(
@@ -316,7 +317,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
while (!isClosed.get()) {
try {
Thread.sleep(1000); // wait some time
- } catch (InterruptedException e) {
+ } catch (final InterruptedException e) {
break;
}
final List<SubscriptionMessage> messages =
@@ -335,18 +336,16 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
consumer.commitSync(messages);
}
consumer.unsubscribe("topic1");
- LOGGER.info(
- "consumer {} (group {}) exiting...",
- consumer.getConsumerId(),
- consumer.getConsumerGroupId());
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
- // avoid fail
+ // Avoid fail
+ } finally {
+ LOGGER.info("consumer exiting...");
}
});
thread.start();
- // check data on receiver
+ // Check data on receiver
final Set<String> expectedResSet = new HashSet<>();
expectedResSet.add("1000,1.0,");
expectedResSet.add("2000,3.0,");
@@ -356,7 +355,9 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
final Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
Awaitility.await()
- .atMost(100, TimeUnit.SECONDS)
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
.untilAsserted(
() ->
TestUtils.assertResultSetEqual(
@@ -364,7 +365,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
"Time,root.db.d1.at1,",
expectedResSet));
}
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
new file mode 100644
index 00000000000..fc175adf97d
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBSubscriptionBasicIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testBasicSubscription() throws Exception {
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Create topic
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ session.createTopic("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final AtomicInteger rowCount = new AtomicInteger();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe("topic1");
+ while (!isClosed.get()) {
+ try {
+ Thread.sleep(1000); // wait some time
+ } catch (final InterruptedException e) {
+ break;
+ }
+ final List<SubscriptionMessage> messages =
+ consumer.poll(Duration.ofMillis(10000));
+ if (messages.isEmpty()) {
+ continue;
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionSessionDataSets payload =
+ (SubscriptionSessionDataSets) message.getPayload();
+ for (final SubscriptionSessionDataSet dataSet : payload) {
+ while (dataSet.hasNext()) {
+ dataSet.next();
+ rowCount.addAndGet(1);
+ }
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ consumer.unsubscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // avoid fail
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Check row count
+ try {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
new file mode 100644
index 00000000000..4912c2d1f68
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBSubscriptionIdempotentIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionIdempotentIT.class);
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testSubscribeOrUnsubscribeNonExistedTopicTest() {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ // Subscribe non-existed topic
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe("topic1");
+ fail();
+ } catch (final Exception ignored) {
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+
+ // Unsubscribe non-existed topic
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.unsubscribe("topic1");
+ fail();
+ } catch (final Exception ignored) {
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ }
+
+ @Test
+ public void testSubscribeExistedSubscribedTopicTest() {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ // Create topic
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ session.createTopic("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ consumer.subscribe("topic1");
+ // Subscribe existed subscribed topic
+ consumer.subscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ }
+
+ @Test
+ public void testUnsubscribeExistedNonSubscribedTopicTest() {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ // Create topic
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ session.createTopic("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
+
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ // unsubscribe existed non-subscribed topic
+ consumer.unsubscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
similarity index 55%
rename from
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionBasicIT.java
rename to
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 22051560151..6f909d5dc42 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.local;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -37,6 +37,7 @@ import
org.apache.iotdb.session.subscription.SubscriptionSession;
import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
+import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -47,17 +48,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
-public class IoTDBSubscriptionBasicIT {
+public class IoTDBSubscriptionRestartIT {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
-
- private static final long MAX_RETRY_COUNT = 30;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);
@Before
public void setUp() throws Exception {
@@ -70,106 +73,20 @@ public class IoTDBSubscriptionBasicIT {
}
@Test
- public void testSimpleSubscription() {
- // Insert some historical data
- try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
- for (int i = 0; i < 100; ++i) {
- session.executeNonQueryStatement(
- String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
- }
- session.executeNonQueryStatement("flush");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- // Create topic
- String host = EnvFactory.getEnv().getIP();
- int port = Integer.parseInt(EnvFactory.getEnv().getPort());
- try (SubscriptionSession session = new SubscriptionSession(host, port)) {
- session.open();
- session.createTopic("topic1");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- // Subscription
- int count = 0;
- long retryCount = 0;
- try (final SubscriptionPullConsumer consumer =
- new SubscriptionPullConsumer.Builder()
- .host(host)
- .port(port)
- .consumerId("c1")
- .consumerGroupId("cg1")
- .autoCommit(false)
- .buildPullConsumer()) {
- consumer.open();
- consumer.subscribe("topic1");
- while (true) {
- Thread.sleep(1000 * retryCount); // wait some time
- final List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
- if (messages.isEmpty()) {
- if (retryCount >= MAX_RETRY_COUNT) {
- break;
- }
- retryCount += 1;
- continue;
- }
- for (final SubscriptionMessage message : messages) {
- final SubscriptionSessionDataSets payload =
- (SubscriptionSessionDataSets) message.getPayload();
- for (final SubscriptionSessionDataSet dataSet : payload) {
- while (dataSet.hasNext()) {
- dataSet.next();
- count += 1;
- }
- }
- }
- consumer.commitSync(messages);
- }
- consumer.unsubscribe("topic1");
- LOGGER.info(
- "consumer {} (group {}) exiting...",
- consumer.getConsumerId(),
- consumer.getConsumerGroupId());
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- Assert.assertEquals(100, count);
- }
-
- @Test
- public void testRestartSubscription() throws Exception {
- // Insert some historical data
- try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
- for (int i = 0; i < 100; ++i) {
- session.executeNonQueryStatement(
- String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
- }
- session.executeNonQueryStatement("flush");
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
+ public void testSubscriptionAfterRestart() throws Exception {
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
// Create topic
- String host = EnvFactory.getEnv().getIP();
- int port = Integer.parseInt(EnvFactory.getEnv().getPort());
- try (SubscriptionSession session = new SubscriptionSession(host, port)) {
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
session.open();
session.createTopic("topic1");
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
// Subscription
- int count = 0;
- long retryCount = 0;
try {
final SubscriptionPullConsumer consumer =
new SubscriptionPullConsumer.Builder()
@@ -181,36 +98,11 @@ public class IoTDBSubscriptionBasicIT {
.buildPullConsumer();
consumer.open();
consumer.subscribe("topic1");
- while (true) {
- Thread.sleep(1000 * retryCount); // wait some time
- final List<SubscriptionMessage> messages =
consumer.poll(Duration.ofMillis(10000));
- if (messages.isEmpty()) {
- if (retryCount >= MAX_RETRY_COUNT) {
- break;
- }
- retryCount += 1;
- continue;
- }
- for (final SubscriptionMessage message : messages) {
- final SubscriptionSessionDataSets payload =
- (SubscriptionSessionDataSets) message.getPayload();
- for (final SubscriptionSessionDataSet dataSet : payload) {
- while (dataSet.hasNext()) {
- dataSet.next();
- count += 1;
- }
- }
- }
- consumer.commitSync(messages);
- }
- // We do not unsubscribe topic and close consumer here
- } catch (Exception e) {
+ } catch (final Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
- Assert.assertEquals(100, count);
-
// Restart cluster
TestUtils.restartCluster(EnvFactory.getEnv());
@@ -228,5 +120,81 @@ public class IoTDBSubscriptionBasicIT {
Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
Assert.assertEquals(1, showSubscriptionResp.subscriptionInfoList.size());
}
+
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription again
+ final Map<Long, Long> timestamps = new HashMap<>();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(false)
+ .buildPullConsumer()) {
+ consumer.open();
+ while (!isClosed.get()) {
+ try {
+ Thread.sleep(1000); // wait some time
+ } catch (final InterruptedException e) {
+ break;
+ }
+ final List<SubscriptionMessage> messages =
+ consumer.poll(Duration.ofMillis(10000));
+ if (messages.isEmpty()) {
+ continue;
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionSessionDataSets payload =
+ (SubscriptionSessionDataSets) message.getPayload();
+ for (final SubscriptionSessionDataSet dataSet : payload) {
+ while (dataSet.hasNext()) {
+ final long timestamp = dataSet.next().getTimestamp();
+ timestamps.put(timestamp, timestamp);
+ }
+ }
+ }
+ consumer.commitSync(messages);
+ }
+ consumer.unsubscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // avoid fail
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Check timestamps size
+ try {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 85494fe965b..8346277e2b5 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -100,7 +100,8 @@ public class SubscriptionSession extends Session {
executeNonQueryStatement(sql);
}
- public void drop(String topicName) throws IoTDBConnectionException,
StatementExecutionException {
+ public void dropTopic(String topicName)
+ throws IoTDBConnectionException, StatementExecutionException {
final String sql = String.format("DROP TOPIC %s", topicName);
executeNonQueryStatement(sql);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
index 769a7bbee06..4b3808f1a09 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
@@ -109,6 +109,6 @@ public class SerializedEnrichedEvent {
// Recycle events that may not be able to be committed, i.e., those that
have been polled but
// not committed within a certain period of time.
return System.currentTimeMillis() - lastPolledTimestamp
- >
SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalSeconds();
+ >
SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs();
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8330fb75b45..90610bbcb53 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -231,7 +231,7 @@ public class CommonConfig {
private int subscriptionPollMaxBlockingTimeMs = 500;
private int subscriptionSerializeMaxBlockingTimeMs = 100;
private long subscriptionLaunchRetryIntervalMs = 1000;
- private int subscriptionRecycleUncommittedEventIntervalSeconds = 240;
+ private int subscriptionRecycleUncommittedEventIntervalMs = 240000; // 240s
private long subscriptionDefaultPollTimeoutMs = 30000;
private long subscriptionMinPollTimeoutMs = 500;
@@ -988,14 +988,14 @@ public class CommonConfig {
this.subscriptionLaunchRetryIntervalMs = subscriptionLaunchRetryIntervalMs;
}
- public int getSubscriptionRecycleUncommittedEventIntervalSeconds() {
- return subscriptionRecycleUncommittedEventIntervalSeconds;
+ public int getSubscriptionRecycleUncommittedEventIntervalMs() {
+ return subscriptionRecycleUncommittedEventIntervalMs;
}
- public void setSubscriptionRecycleUncommittedEventIntervalSeconds(
- int subscriptionRecycleUncommittedEventIntervalSeconds) {
- this.subscriptionRecycleUncommittedEventIntervalSeconds =
- subscriptionRecycleUncommittedEventIntervalSeconds;
+ public void setSubscriptionRecycleUncommittedEventIntervalMs(
+ int subscriptionRecycleUncommittedEventIntervalMs) {
+ this.subscriptionRecycleUncommittedEventIntervalMs =
+ subscriptionRecycleUncommittedEventIntervalMs;
}
public long getSubscriptionDefaultPollTimeoutMs() {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 213b0b171d8..3a9e2017902 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -542,11 +542,11 @@ public class CommonDescriptor {
properties.getProperty(
"subscription_launch_retry_interval_ms",
String.valueOf(config.getSubscriptionLaunchRetryIntervalMs()))));
- config.setSubscriptionRecycleUncommittedEventIntervalSeconds(
+ config.setSubscriptionRecycleUncommittedEventIntervalMs(
Integer.parseInt(
properties.getProperty(
- "subscription_recycle_uncommitted_event_interval_seconds",
-
String.valueOf(config.getSubscriptionRecycleUncommittedEventIntervalSeconds()))));
+ "subscription_recycle_uncommitted_event_interval_ms",
+
String.valueOf(config.getSubscriptionRecycleUncommittedEventIntervalMs()))));
config.setSubscriptionDefaultPollTimeoutMs(
Integer.parseInt(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 35026b55ca4..a6ba77b34ca 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -51,8 +51,8 @@ public class SubscriptionConfig {
return COMMON_CONFIG.getSubscriptionLaunchRetryIntervalMs();
}
- public int getSubscriptionRecycleUncommittedEventIntervalSeconds() {
- return
COMMON_CONFIG.getSubscriptionRecycleUncommittedEventIntervalSeconds();
+ public int getSubscriptionRecycleUncommittedEventIntervalMs() {
+ return COMMON_CONFIG.getSubscriptionRecycleUncommittedEventIntervalMs();
}
public long getSubscriptionDefaultPollTimeoutMs() {
@@ -78,8 +78,8 @@ public class SubscriptionConfig {
"SubscriptionSerializeMaxBlockingTimeMs: {}",
getSubscriptionSerializeMaxBlockingTimeMs());
LOGGER.info("SubscriptionLaunchRetryIntervalMs: {}",
getSubscriptionLaunchRetryIntervalMs());
LOGGER.info(
- "SubscriptionRecycleUncommittedEventIntervalSeconds: {}",
- getSubscriptionRecycleUncommittedEventIntervalSeconds());
+ "SubscriptionRecycleUncommittedEventIntervalMs: {}",
+ getSubscriptionRecycleUncommittedEventIntervalMs());
LOGGER.info("SubscriptionDefaultPollTimeoutMs: {}",
getSubscriptionDefaultPollTimeoutMs());
LOGGER.info("SubscriptionMinPollTimeoutMs: {}",
getSubscriptionMinPollTimeoutMs());
}