This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cd6ba732a42 Subscription: fix some issues on DN and session & improve 
IT (#12254)
cd6ba732a42 is described below

commit cd6ba732a428585d9036121c97ca272b0fc2d3c5
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Mar 29 23:21:41 2024 +0800

    Subscription: fix some issues on DN and session & improve IT (#12254)
---
 .../it/{ => dual}/AbstractSubscriptionDualIT.java  |   2 +-
 .../IoTDBSubscriptionConsumerGroupIT.java          |  58 +++---
 .../it/{ => dual}/IoTDBSubscriptionTopicIT.java    | 113 +++++------
 .../it/local/IoTDBSubscriptionBasicIT.java         | 157 +++++++++++++++
 .../it/local/IoTDBSubscriptionIdempotentIT.java    | 162 ++++++++++++++++
 .../IoTDBSubscriptionRestartIT.java}               | 212 +++++++++------------
 .../session/subscription/SubscriptionSession.java  |   3 +-
 .../broker/SerializedEnrichedEvent.java            |   2 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  14 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |   6 +-
 .../subscription/config/SubscriptionConfig.java    |   8 +-
 11 files changed, 515 insertions(+), 222 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
similarity index 97%
rename from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionDualIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 46e537cd7f6..6f2dbd41230 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/AbstractSubscriptionDualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.dual;
 
 import org.apache.iotdb.it.env.MultiEnvFactory;
 import org.apache.iotdb.itbase.env.BaseEnv;
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
similarity index 93%
rename from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
index 63059492eb0..54d3867c99c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionConsumerGroupIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionConsumerGroupIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.dual;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
@@ -68,7 +68,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   public void test3C1CGSubscribeOneTopicHistoricalData() throws Exception {
     final long currentTime = System.currentTimeMillis();
 
-    // history data
+    // Historical data
     insertData(currentTime);
 
     createTopics(currentTime);
@@ -93,7 +93,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   public void test3C3CGSubscribeOneTopicHistoricalData() throws Exception {
     final long currentTime = System.currentTimeMillis();
 
-    // history data
+    // Historical data
     insertData(currentTime);
 
     createTopics(currentTime);
@@ -120,7 +120,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   public void test3C1CGSubscribeTwoTopicHistoricalData() throws Exception {
     final long currentTime = System.currentTimeMillis();
 
-    // history data
+    // Historical data
     insertData(currentTime);
 
     createTopics(currentTime);
@@ -146,7 +146,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   public void test3C3CGSubscribeTwoTopicHistoricalData() throws Exception {
     final long currentTime = System.currentTimeMillis();
 
-    // history data
+    // Historical data
     insertData(currentTime);
 
     createTopics(currentTime);
@@ -174,7 +174,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   public void test4C2CGSubscribeTwoTopicHistoricalData() throws Exception {
     final long currentTime = System.currentTimeMillis();
 
-    // history data
+    // Historical data
     insertData(currentTime);
 
     createTopics(currentTime);
@@ -213,7 +213,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
     consumers.add(createConsumerAndSubscribeTopics("c2", "cg1", "topic1"));
     consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
 
-    // realtime data
+    // Realtime data
     insertData(currentTime);
 
     pollMessagesAndCheck(
@@ -320,7 +320,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
     consumers.add(createConsumerAndSubscribeTopics("c3", "cg1", "topic1"));
     consumers.add(createConsumerAndSubscribeTopics("c4", "cg2", "topic2"));
 
-    // realtime data
+    // Realtime data
     insertData(currentTime);
 
     pollMessagesAndCheck(
@@ -338,22 +338,22 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
 
   /////////////////////////////// utility ///////////////////////////////
 
-  private void createTopics(long currentTime) {
+  private void createTopics(final long currentTime) {
     // Create topics on sender
     try (final ISession session = senderEnv.getSessionConnection()) {
       session.executeNonQueryStatement(
           String.format("create topic topic1 with ('end-time'='%s')", 
currentTime - 1));
       session.executeNonQueryStatement(
           String.format("create topic topic2 with ('start-time'='%s')", 
currentTime));
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
   }
 
-  private void insertData(long currentTime) {
+  private void insertData(final long currentTime) {
     // Insert some data on sender
-    try (ISession session = senderEnv.getSessionConnection()) {
+    try (final ISession session = senderEnv.getSessionConnection()) {
       for (int i = 0; i < 100; ++i) {
         session.executeNonQueryStatement(
             String.format("insert into root.topic1(time, s) values (%s, 1)", 
i)); // topic1
@@ -362,13 +362,13 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                 "insert into root.topic2(time, s) values (%s, 1)", currentTime 
+ i)); // topic2
       }
       session.executeNonQueryStatement("flush");
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
   }
 
-  private void createPipes(long currentTime) {
+  private void createPipes(final long currentTime) {
     // For sync reference
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
@@ -390,7 +390,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                   .setExtractorAttributes(extractorAttributes)
                   .setProcessorAttributes(processorAttributes));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
@@ -415,14 +415,15 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                   .setExtractorAttributes(extractorAttributes)
                   .setProcessorAttributes(processorAttributes));
       Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
   }
 
   private SubscriptionPullConsumer createConsumerAndSubscribeTopics(
-      String consumerId, String consumerGroupId, String... topicNames) throws 
Exception {
+      final String consumerId, final String consumerGroupId, final String... 
topicNames)
+      throws Exception {
     final SubscriptionPullConsumer consumer =
         new SubscriptionPullConsumer.Builder()
             .host(senderEnv.getIP())
@@ -437,7 +438,8 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
   }
 
   private void pollMessagesAndCheck(
-      List<SubscriptionPullConsumer> consumers, Map<String, String> 
expectedHeaderWithResult)
+      final List<SubscriptionPullConsumer> consumers,
+      final Map<String, String> expectedHeaderWithResult)
       throws Exception {
     final AtomicBoolean isClosed = new AtomicBoolean(false);
     final AtomicBoolean receiverCrashed = new AtomicBoolean(false);
@@ -447,14 +449,14 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
       final int index = i;
       final String consumerId = consumers.get(index).getConsumerId();
       final String consumerGroupId = consumers.get(index).getConsumerGroupId();
-      Thread t =
+      final Thread t =
           new Thread(
               () -> {
                 try (final SubscriptionPullConsumer consumer = 
consumers.get(index)) {
                   while (!isClosed.get()) {
                     try {
                       Thread.sleep(1000); // wait some time
-                    } catch (InterruptedException e) {
+                    } catch (final InterruptedException e) {
                       break;
                     }
                     final List<SubscriptionMessage> messages =
@@ -480,14 +482,15 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                     consumer.commitSync(messages);
                   }
                   // No need to unsubscribe
-                } catch (Exception e) {
+                } catch (final Exception e) {
                   e.printStackTrace();
                   // Avoid failure
                 } finally {
-                  LOGGER.info("consumer {} (group {}) exiting...", consumerId, 
consumerGroupId);
+                  LOGGER.info(
+                      "consumer {} (consumer group {}) exiting...", 
consumerId, consumerGroupId);
                 }
               },
-              String.format("%s_%s", consumerGroupId, consumerId));
+              String.format("%s_%s", consumerId, consumerGroupId));
       t.start();
       threads.add(t);
     }
@@ -512,12 +515,12 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
                       expectedHeaderWithResult);
                 });
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     } finally {
       isClosed.set(true);
-      for (Thread thread : threads) {
+      for (final Thread thread : threads) {
         thread.join();
       }
     }
@@ -525,7 +528,8 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
 
   /** @return false -> receiver crashed */
   private boolean insertRowRecordEnrichedByConsumerGroupId(
-      List<String> columnNameList, RowRecord record, String consumerGroupId) 
throws Exception {
+      final List<String> columnNameList, final RowRecord record, final String 
consumerGroupId)
+      throws Exception {
     if (columnNameList.size() != 2) {
       LOGGER.warn("unexpected column name list: {}", columnNameList);
       throw new Exception("unexpected column name list");
@@ -545,7 +549,7 @@ public class IoTDBSubscriptionConsumerGroupIT extends 
AbstractSubscriptionDualIT
       return TestUtils.tryExecuteNonQueryWithRetry(receiverEnv, sql);
     } else {
       LOGGER.warn("unexpected column name: {}", columnName);
-      throw new Exception("unexpected column name list");
+      throw new Exception("unexpected column name");
     }
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
similarity index 82%
rename from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionTopicIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 5c861a6c639..e345c7232a3 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.dual;
 
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.isession.ISession;
@@ -59,7 +59,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
 
   @Test
   public void testTopicPathSubscription() throws Exception {
-    // insert some history data on sender
+    // Insert some historical data on sender
     try (final ISession session = senderEnv.getSessionConnection()) {
       for (int i = 0; i < 100; ++i) {
         session.executeNonQueryStatement(
@@ -72,30 +72,30 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
             String.format("insert into root.db.t1(time, s1) values (%s, 1)", 
i));
       }
       session.executeNonQueryStatement("flush");
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    // create topic on sender
+    // Create topic on sender
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
-    try (SubscriptionSession session = new SubscriptionSession(host, port)) {
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      Properties config = new Properties();
+      final Properties config = new Properties();
       config.put(TopicConstant.PATH_KEY, "root.db.*.s");
       session.createTopic("topic1", config);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    // subscribe on sender and insert on receiver
+    // Subscribe on sender and insert on receiver
     final AtomicBoolean isClosed = new AtomicBoolean(false);
     final Thread thread =
         new Thread(
             () -> {
-              try (SubscriptionPullConsumer consumer =
+              try (final SubscriptionPullConsumer consumer =
                       new SubscriptionPullConsumer.Builder()
                           .host(host)
                           .port(port)
@@ -103,48 +103,49 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                           .consumerGroupId("cg1")
                           .autoCommit(false)
                           .buildPullConsumer();
-                  ISession session = receiverEnv.getSessionConnection()) {
+                  final ISession session = receiverEnv.getSessionConnection()) 
{
                 consumer.open();
                 consumer.subscribe("topic1");
                 while (!isClosed.get()) {
                   try {
                     Thread.sleep(1000); // wait some time
-                  } catch (InterruptedException e) {
+                  } catch (final InterruptedException e) {
                     break;
                   }
-                  List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(10000));
+                  final List<SubscriptionMessage> messages =
+                      consumer.poll(Duration.ofMillis(10000));
                   if (messages.isEmpty()) {
                     continue;
                   }
-                  for (SubscriptionMessage message : messages) {
-                    SubscriptionSessionDataSets payload =
+                  for (final SubscriptionMessage message : messages) {
+                    final SubscriptionSessionDataSets payload =
                         (SubscriptionSessionDataSets) message.getPayload();
-                    for (Iterator<Tablet> it = payload.tabletIterator(); 
it.hasNext(); ) {
-                      Tablet tablet = it.next();
+                    for (final Iterator<Tablet> it = payload.tabletIterator(); 
it.hasNext(); ) {
+                      final Tablet tablet = it.next();
                       session.insertTablet(tablet);
                     }
                   }
                   consumer.commitSync(messages);
                 }
                 consumer.unsubscribe("topic1");
-                LOGGER.info(
-                    "consumer {} (group {}) exiting...",
-                    consumer.getConsumerId(),
-                    consumer.getConsumerGroupId());
-              } catch (Exception e) {
+              } catch (final Exception e) {
                 e.printStackTrace();
-                // avoid fail
+                // Avoid fail
+              } finally {
+                LOGGER.info("consumer exiting...");
               }
             });
     thread.start();
 
-    // check data on receiver
+    // Check data on receiver
     try {
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
         Awaitility.await()
-            .atMost(100, TimeUnit.SECONDS)
+            .pollDelay(1, TimeUnit.SECONDS)
+            .pollInterval(1, TimeUnit.SECONDS)
+            .atMost(120, TimeUnit.SECONDS)
             .untilAsserted(
                 () ->
                     TestUtils.assertSingleResultSetEqual(
@@ -156,7 +157,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                           }
                         }));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     } finally {
@@ -167,7 +168,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
 
   @Test
   public void testTopicTimeSubscription() throws Exception {
-    // insert some history data on sender
+    // Insert some historical data on sender
     final long currentTime = System.currentTimeMillis();
     try (final ISession session = senderEnv.getSessionConnection()) {
       for (int i = 0; i < 100; ++i) {
@@ -177,25 +178,25 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
             String.format("insert into root.db.d2(time, s) values (%s, 1)", 
currentTime + i));
       }
       session.executeNonQueryStatement("flush");
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    // create topic on sender
+    // Create topic on sender
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
-    try (SubscriptionSession session = new SubscriptionSession(host, port)) {
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
-      Properties config = new Properties();
+      final Properties config = new Properties();
       config.put(TopicConstant.START_TIME_KEY, currentTime);
       session.createTopic("topic1", config);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    // subscribe on sender and insert on receiver
+    // Subscribe on sender and insert on receiver
     final AtomicBoolean isClosed = new AtomicBoolean(false);
     final Thread thread =
         new Thread(
@@ -214,7 +215,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                 while (!isClosed.get()) {
                   try {
                     Thread.sleep(1000); // wait some time
-                  } catch (InterruptedException e) {
+                  } catch (final InterruptedException e) {
                     break;
                   }
                   final List<SubscriptionMessage> messages =
@@ -233,24 +234,24 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   consumer.commitSync(messages);
                 }
                 consumer.unsubscribe("topic1");
-                LOGGER.info(
-                    "consumer {} (group {}) exiting...",
-                    consumer.getConsumerId(),
-                    consumer.getConsumerGroupId());
-              } catch (Exception e) {
+              } catch (final Exception e) {
                 e.printStackTrace();
-                // avoid fail
+                // Avoid fail
+              } finally {
+                LOGGER.info("consumer exiting...");
               }
             });
     thread.start();
 
-    // check data on receiver
+    // Check data on receiver
     try {
       try (final Connection connection = receiverEnv.getConnection();
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
         Awaitility.await()
-            .atMost(100, TimeUnit.SECONDS)
+            .pollDelay(1, TimeUnit.SECONDS)
+            .pollInterval(1, TimeUnit.SECONDS)
+            .atMost(120, TimeUnit.SECONDS)
             .untilAsserted(
                 () ->
                     TestUtils.assertSingleResultSetEqual(
@@ -261,7 +262,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                           }
                         }));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     } finally {
@@ -277,12 +278,12 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       session.executeNonQueryStatement(
           "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2), 
(2000, 3), (2500, 4), (3000, 5)");
       session.executeNonQueryStatement("flush");
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    // create topic
+    // Create topic
     final String host = senderEnv.getIP();
     final int port = Integer.parseInt(senderEnv.getPort());
     try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
@@ -292,12 +293,12 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
       config.put("processor.tumbling-time.interval-seconds", "1");
       config.put("processor.down-sampling.split-file", "true");
       session.createTopic("topic1", config);
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    // subscribe on sender and insert on receiver
+    // Subscribe on sender and insert on receiver
     final AtomicBoolean isClosed = new AtomicBoolean(false);
     final Thread thread =
         new Thread(
@@ -316,7 +317,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                 while (!isClosed.get()) {
                   try {
                     Thread.sleep(1000); // wait some time
-                  } catch (InterruptedException e) {
+                  } catch (final InterruptedException e) {
                     break;
                   }
                   final List<SubscriptionMessage> messages =
@@ -335,18 +336,16 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                   consumer.commitSync(messages);
                 }
                 consumer.unsubscribe("topic1");
-                LOGGER.info(
-                    "consumer {} (group {}) exiting...",
-                    consumer.getConsumerId(),
-                    consumer.getConsumerGroupId());
-              } catch (Exception e) {
+              } catch (final Exception e) {
                 e.printStackTrace();
-                // avoid fail
+                // Avoid fail
+              } finally {
+                LOGGER.info("consumer exiting...");
               }
             });
     thread.start();
 
-    // check data on receiver
+    // Check data on receiver
     final Set<String> expectedResSet = new HashSet<>();
     expectedResSet.add("1000,1.0,");
     expectedResSet.add("2000,3.0,");
@@ -356,7 +355,9 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
           final Statement statement = connection.createStatement()) {
         // Keep retrying if there are execution failures
         Awaitility.await()
-            .atMost(100, TimeUnit.SECONDS)
+            .pollDelay(1, TimeUnit.SECONDS)
+            .pollInterval(1, TimeUnit.SECONDS)
+            .atMost(120, TimeUnit.SECONDS)
             .untilAsserted(
                 () ->
                     TestUtils.assertResultSetEqual(
@@ -364,7 +365,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                         "Time,root.db.d1.at1,",
                         expectedResSet));
       }
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     } finally {
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
new file mode 100644
index 00000000000..fc175adf97d
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionMessage;
+import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBSubscriptionBasicIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testBasicSubscription() throws Exception {
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      session.createTopic("topic1");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final AtomicInteger rowCount = new AtomicInteger();
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                  new SubscriptionPullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .autoCommit(false)
+                      .buildPullConsumer()) {
+                consumer.open();
+                consumer.subscribe("topic1");
+                while (!isClosed.get()) {
+                  try {
+                    Thread.sleep(1000); // wait some time
+                  } catch (final InterruptedException e) {
+                    break;
+                  }
+                  final List<SubscriptionMessage> messages =
+                      consumer.poll(Duration.ofMillis(10000));
+                  if (messages.isEmpty()) {
+                    continue;
+                  }
+                  for (final SubscriptionMessage message : messages) {
+                    final SubscriptionSessionDataSets payload =
+                        (SubscriptionSessionDataSets) message.getPayload();
+                    for (final SubscriptionSessionDataSet dataSet : payload) {
+                      while (dataSet.hasNext()) {
+                        dataSet.next();
+                        rowCount.addAndGet(1);
+                      }
+                    }
+                  }
+                  consumer.commitSync(messages);
+                }
+                consumer.unsubscribe("topic1");
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // avoid fail
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            });
+    thread.start();
+
+    // Check row count
+    try {
+      // Keep retrying if there are execution failures
+      Awaitility.await()
+          .pollDelay(1, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .atMost(120, TimeUnit.SECONDS)
+          .untilAsserted(() -> Assert.assertEquals(100, rowCount.get()));
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
new file mode 100644
index 00000000000..4912c2d1f68
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.subscription.it.local;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.SubscriptionSession;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBSubscriptionIdempotentIT {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionIdempotentIT.class);
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testSubscribeOrUnsubscribeNonExistedTopicTest() {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    // Subscribe non-existed topic
+    try (final SubscriptionPullConsumer consumer =
+        new SubscriptionPullConsumer.Builder()
+            .host(host)
+            .port(port)
+            .consumerId("c1")
+            .consumerGroupId("cg1")
+            .autoCommit(false)
+            .buildPullConsumer()) {
+      consumer.open();
+      consumer.subscribe("topic1");
+      fail();
+    } catch (final Exception ignored) {
+    } finally {
+      LOGGER.info("consumer exiting...");
+    }
+
+    // Unsubscribe non-existed topic
+    try (final SubscriptionPullConsumer consumer =
+        new SubscriptionPullConsumer.Builder()
+            .host(host)
+            .port(port)
+            .consumerId("c1")
+            .consumerGroupId("cg1")
+            .autoCommit(false)
+            .buildPullConsumer()) {
+      consumer.open();
+      consumer.unsubscribe("topic1");
+      fail();
+    } catch (final Exception ignored) {
+    } finally {
+      LOGGER.info("consumer exiting...");
+    }
+  }
+
+  @Test
+  public void testSubscribeExistedSubscribedTopicTest() {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    // Create topic
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      session.createTopic("topic1");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    try (final SubscriptionPullConsumer consumer =
+        new SubscriptionPullConsumer.Builder()
+            .host(host)
+            .port(port)
+            .consumerId("c1")
+            .consumerGroupId("cg1")
+            .autoCommit(false)
+            .buildPullConsumer()) {
+      consumer.open();
+      consumer.subscribe("topic1");
+      // Subscribe existed subscribed topic
+      consumer.subscribe("topic1");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      LOGGER.info("consumer exiting...");
+    }
+  }
+
+  @Test
+  public void testUnsubscribeExistedNonSubscribedTopicTest() {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+    // Create topic
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      session.createTopic("topic1");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    try (final SubscriptionPullConsumer consumer =
+        new SubscriptionPullConsumer.Builder()
+            .host(host)
+            .port(port)
+            .consumerId("c1")
+            .consumerGroupId("cg1")
+            .autoCommit(false)
+            .buildPullConsumer()) {
+      consumer.open();
+      // unsubscribe existed non-subscribed topic
+      consumer.unsubscribe("topic1");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } finally {
+      LOGGER.info("consumer exiting...");
+    }
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
similarity index 55%
rename from 
integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionBasicIT.java
rename to 
integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 22051560151..6f909d5dc42 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.subscription.it;
+package org.apache.iotdb.subscription.it.local;
 
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.session.subscription.SubscriptionSession;
 import org.apache.iotdb.session.subscription.SubscriptionSessionDataSet;
 import org.apache.iotdb.session.subscription.SubscriptionSessionDataSets;
 
+import org.awaitility.Awaitility;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -47,17 +48,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({LocalStandaloneIT.class, ClusterIT.class})
-public class IoTDBSubscriptionBasicIT {
+public class IoTDBSubscriptionRestartIT {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
-
-  private static final long MAX_RETRY_COUNT = 30;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);
 
   @Before
   public void setUp() throws Exception {
@@ -70,106 +73,20 @@ public class IoTDBSubscriptionBasicIT {
   }
 
   @Test
-  public void testSimpleSubscription() {
-    // Insert some historical data
-    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      for (int i = 0; i < 100; ++i) {
-        session.executeNonQueryStatement(
-            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
-      }
-      session.executeNonQueryStatement("flush");
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-
-    // Create topic
-    String host = EnvFactory.getEnv().getIP();
-    int port = Integer.parseInt(EnvFactory.getEnv().getPort());
-    try (SubscriptionSession session = new SubscriptionSession(host, port)) {
-      session.open();
-      session.createTopic("topic1");
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-
-    // Subscription
-    int count = 0;
-    long retryCount = 0;
-    try (final SubscriptionPullConsumer consumer =
-        new SubscriptionPullConsumer.Builder()
-            .host(host)
-            .port(port)
-            .consumerId("c1")
-            .consumerGroupId("cg1")
-            .autoCommit(false)
-            .buildPullConsumer()) {
-      consumer.open();
-      consumer.subscribe("topic1");
-      while (true) {
-        Thread.sleep(1000 * retryCount); // wait some time
-        final List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(10000));
-        if (messages.isEmpty()) {
-          if (retryCount >= MAX_RETRY_COUNT) {
-            break;
-          }
-          retryCount += 1;
-          continue;
-        }
-        for (final SubscriptionMessage message : messages) {
-          final SubscriptionSessionDataSets payload =
-              (SubscriptionSessionDataSets) message.getPayload();
-          for (final SubscriptionSessionDataSet dataSet : payload) {
-            while (dataSet.hasNext()) {
-              dataSet.next();
-              count += 1;
-            }
-          }
-        }
-        consumer.commitSync(messages);
-      }
-      consumer.unsubscribe("topic1");
-      LOGGER.info(
-          "consumer {} (group {}) exiting...",
-          consumer.getConsumerId(),
-          consumer.getConsumerGroupId());
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
-
-    Assert.assertEquals(100, count);
-  }
-
-  @Test
-  public void testRestartSubscription() throws Exception {
-    // Insert some historical data
-    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
-      for (int i = 0; i < 100; ++i) {
-        session.executeNonQueryStatement(
-            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
-      }
-      session.executeNonQueryStatement("flush");
-    } catch (Exception e) {
-      e.printStackTrace();
-      fail(e.getMessage());
-    }
+  public void testSubscriptionAfterRestart() throws Exception {
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
 
     // Create topic
-    String host = EnvFactory.getEnv().getIP();
-    int port = Integer.parseInt(EnvFactory.getEnv().getPort());
-    try (SubscriptionSession session = new SubscriptionSession(host, port)) {
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
       session.open();
       session.createTopic("topic1");
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
     // Subscription
-    int count = 0;
-    long retryCount = 0;
     try {
       final SubscriptionPullConsumer consumer =
           new SubscriptionPullConsumer.Builder()
@@ -181,36 +98,11 @@ public class IoTDBSubscriptionBasicIT {
               .buildPullConsumer();
       consumer.open();
       consumer.subscribe("topic1");
-      while (true) {
-        Thread.sleep(1000 * retryCount); // wait some time
-        final List<SubscriptionMessage> messages = 
consumer.poll(Duration.ofMillis(10000));
-        if (messages.isEmpty()) {
-          if (retryCount >= MAX_RETRY_COUNT) {
-            break;
-          }
-          retryCount += 1;
-          continue;
-        }
-        for (final SubscriptionMessage message : messages) {
-          final SubscriptionSessionDataSets payload =
-              (SubscriptionSessionDataSets) message.getPayload();
-          for (final SubscriptionSessionDataSet dataSet : payload) {
-            while (dataSet.hasNext()) {
-              dataSet.next();
-              count += 1;
-            }
-          }
-        }
-        consumer.commitSync(messages);
-      }
-      // We do not unsubscribe topic and close consumer here
-    } catch (Exception e) {
+    } catch (final Exception e) {
       e.printStackTrace();
       fail(e.getMessage());
     }
 
-    Assert.assertEquals(100, count);
-
     // Restart cluster
     TestUtils.restartCluster(EnvFactory.getEnv());
 
@@ -228,5 +120,81 @@ public class IoTDBSubscriptionBasicIT {
       Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
       Assert.assertEquals(1, showSubscriptionResp.subscriptionInfoList.size());
     }
+
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription again
+    final Map<Long, Long> timestamps = new HashMap<>();
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                  new SubscriptionPullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .autoCommit(false)
+                      .buildPullConsumer()) {
+                consumer.open();
+                while (!isClosed.get()) {
+                  try {
+                    Thread.sleep(1000); // wait some time
+                  } catch (final InterruptedException e) {
+                    break;
+                  }
+                  final List<SubscriptionMessage> messages =
+                      consumer.poll(Duration.ofMillis(10000));
+                  if (messages.isEmpty()) {
+                    continue;
+                  }
+                  for (final SubscriptionMessage message : messages) {
+                    final SubscriptionSessionDataSets payload =
+                        (SubscriptionSessionDataSets) message.getPayload();
+                    for (final SubscriptionSessionDataSet dataSet : payload) {
+                      while (dataSet.hasNext()) {
+                        final long timestamp = dataSet.next().getTimestamp();
+                        timestamps.put(timestamp, timestamp);
+                      }
+                    }
+                  }
+                  consumer.commitSync(messages);
+                }
+                consumer.unsubscribe("topic1");
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // avoid fail
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            });
+    thread.start();
+
+    // Check timestamps size
+    try {
+      // Keep retrying if there are execution failures
+      Awaitility.await()
+          .pollDelay(1, TimeUnit.SECONDS)
+          .pollInterval(1, TimeUnit.SECONDS)
+          .atMost(120, TimeUnit.SECONDS)
+          .untilAsserted(() -> Assert.assertEquals(100, timestamps.size()));
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
   }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 85494fe965b..8346277e2b5 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -100,7 +100,8 @@ public class SubscriptionSession extends Session {
     executeNonQueryStatement(sql);
   }
 
-  public void drop(String topicName) throws IoTDBConnectionException, 
StatementExecutionException {
+  public void dropTopic(String topicName)
+      throws IoTDBConnectionException, StatementExecutionException {
     final String sql = String.format("DROP TOPIC %s", topicName);
     executeNonQueryStatement(sql);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
index 769a7bbee06..4b3808f1a09 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SerializedEnrichedEvent.java
@@ -109,6 +109,6 @@ public class SerializedEnrichedEvent {
     // Recycle events that may not be able to be committed, i.e., those that 
have been polled but
     // not committed within a certain period of time.
     return System.currentTimeMillis() - lastPolledTimestamp
-        > 
SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalSeconds();
+        > 
SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs();
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 8330fb75b45..90610bbcb53 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -231,7 +231,7 @@ public class CommonConfig {
   private int subscriptionPollMaxBlockingTimeMs = 500;
   private int subscriptionSerializeMaxBlockingTimeMs = 100;
   private long subscriptionLaunchRetryIntervalMs = 1000;
-  private int subscriptionRecycleUncommittedEventIntervalSeconds = 240;
+  private int subscriptionRecycleUncommittedEventIntervalMs = 240000; // 240s
   private long subscriptionDefaultPollTimeoutMs = 30000;
   private long subscriptionMinPollTimeoutMs = 500;
 
@@ -988,14 +988,14 @@ public class CommonConfig {
     this.subscriptionLaunchRetryIntervalMs = subscriptionLaunchRetryIntervalMs;
   }
 
-  public int getSubscriptionRecycleUncommittedEventIntervalSeconds() {
-    return subscriptionRecycleUncommittedEventIntervalSeconds;
+  public int getSubscriptionRecycleUncommittedEventIntervalMs() {
+    return subscriptionRecycleUncommittedEventIntervalMs;
   }
 
-  public void setSubscriptionRecycleUncommittedEventIntervalSeconds(
-      int subscriptionRecycleUncommittedEventIntervalSeconds) {
-    this.subscriptionRecycleUncommittedEventIntervalSeconds =
-        subscriptionRecycleUncommittedEventIntervalSeconds;
+  public void setSubscriptionRecycleUncommittedEventIntervalMs(
+      int subscriptionRecycleUncommittedEventIntervalMs) {
+    this.subscriptionRecycleUncommittedEventIntervalMs =
+        subscriptionRecycleUncommittedEventIntervalMs;
   }
 
   public long getSubscriptionDefaultPollTimeoutMs() {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index 213b0b171d8..3a9e2017902 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -542,11 +542,11 @@ public class CommonDescriptor {
             properties.getProperty(
                 "subscription_launch_retry_interval_ms",
                 
String.valueOf(config.getSubscriptionLaunchRetryIntervalMs()))));
-    config.setSubscriptionRecycleUncommittedEventIntervalSeconds(
+    config.setSubscriptionRecycleUncommittedEventIntervalMs(
         Integer.parseInt(
             properties.getProperty(
-                "subscription_recycle_uncommitted_event_interval_seconds",
-                
String.valueOf(config.getSubscriptionRecycleUncommittedEventIntervalSeconds()))));
+                "subscription_recycle_uncommitted_event_interval_ms",
+                
String.valueOf(config.getSubscriptionRecycleUncommittedEventIntervalMs()))));
     config.setSubscriptionDefaultPollTimeoutMs(
         Integer.parseInt(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index 35026b55ca4..a6ba77b34ca 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -51,8 +51,8 @@ public class SubscriptionConfig {
     return COMMON_CONFIG.getSubscriptionLaunchRetryIntervalMs();
   }
 
-  public int getSubscriptionRecycleUncommittedEventIntervalSeconds() {
-    return 
COMMON_CONFIG.getSubscriptionRecycleUncommittedEventIntervalSeconds();
+  public int getSubscriptionRecycleUncommittedEventIntervalMs() {
+    return COMMON_CONFIG.getSubscriptionRecycleUncommittedEventIntervalMs();
   }
 
   public long getSubscriptionDefaultPollTimeoutMs() {
@@ -78,8 +78,8 @@ public class SubscriptionConfig {
         "SubscriptionSerializeMaxBlockingTimeMs: {}", 
getSubscriptionSerializeMaxBlockingTimeMs());
     LOGGER.info("SubscriptionLaunchRetryIntervalMs: {}", 
getSubscriptionLaunchRetryIntervalMs());
     LOGGER.info(
-        "SubscriptionRecycleUncommittedEventIntervalSeconds: {}",
-        getSubscriptionRecycleUncommittedEventIntervalSeconds());
+        "SubscriptionRecycleUncommittedEventIntervalMs: {}",
+        getSubscriptionRecycleUncommittedEventIntervalMs());
     LOGGER.info("SubscriptionDefaultPollTimeoutMs: {}", 
getSubscriptionDefaultPollTimeoutMs());
     LOGGER.info("SubscriptionMinPollTimeoutMs: {}", 
getSubscriptionMinPollTimeoutMs());
   }

Reply via email to