This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 93211e10489 Subscription: adapt pipe completion signal for 
automatically drop subscription (#12724)
93211e10489 is described below

commit 93211e104897b0091bb76f94ecfcc3a6f13331ff
Author: V_Galaxy <[email protected]>
AuthorDate: Fri Jun 14 13:05:24 2024 +0800

    Subscription: adapt pipe completion signal for automatically drop 
subscription (#12724)
---
 .../apache/iotdb/SubscriptionSessionExample.java   | 108 ++++++++++++++-
 .../it/env/cluster/config/MppCommonConfig.java     |  26 ++++
 .../env/cluster/config/MppSharedCommonConfig.java  |  25 ++++
 .../it/env/remote/config/RemoteCommonConfig.java   |  17 +++
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   7 +
 .../it/dual/IoTDBSubscriptionTopicIT.java          | 123 +++++++++++++++++
 .../iotdb/rpc/subscription/config/TopicConfig.java |  32 ++++-
 .../rpc/subscription/config/TopicConstant.java     |   5 +
 .../payload/poll/SubscriptionPollResponse.java     |   3 +
 .../payload/poll/SubscriptionPollResponseType.java |   2 +
 .../payload/poll/TerminationPayload.java           |  42 ++++++
 .../consumer/SubscriptionConsumer.java             |  18 ++-
 .../subscription/CreateSubscriptionProcedure.java  |   2 +
 .../subscription/DropSubscriptionProcedure.java    | 151 +++++++++++----------
 .../db/pipe/agent/task/PipeDataNodeTaskAgent.java  |  18 ++-
 .../agent/SubscriptionConsumerAgent.java           |   7 +
 .../subscription/agent/SubscriptionTopicAgent.java |  12 ++
 .../db/subscription/broker/SubscriptionBroker.java |  13 ++
 .../broker/SubscriptionPrefetchingQueue.java       |  26 +++-
 .../SubscriptionPrefetchingTabletsQueue.java       |  12 ++
 .../broker/SubscriptionPrefetchingTsFileQueue.java |  35 ++---
 .../SubscriptionConnectorSubtaskLifeCycle.java     |   5 +-
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  24 +++-
 .../meta/consumer/ConsumerGroupMeta.java           |  79 +++++++----
 .../commons/subscription/meta/topic/TopicMeta.java |   2 +
 25 files changed, 661 insertions(+), 133 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 3aea4e0a762..3c3ae618ce3 100644
--- 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++ 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -25,9 +25,13 @@ import 
org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
+import org.apache.iotdb.session.subscription.consumer.AckStrategy;
+import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
+import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
+import org.apache.iotdb.session.subscription.payload.SubscriptionTsFileHandler;
 
 import org.apache.tsfile.read.TsFileReader;
 import org.apache.tsfile.read.common.Path;
@@ -35,6 +39,7 @@ import org.apache.tsfile.read.expression.QueryExpression;
 import org.apache.tsfile.read.query.dataset.QueryDataSet;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,6 +56,8 @@ public class SubscriptionSessionExample {
 
   private static final String TOPIC_1 = "topic1";
   private static final String TOPIC_2 = "`'topic2'`";
+  private static final String TOPIC_3 = "`\"topic3\"`";
+  private static final String TOPIC_4 = "`\"top \\.i.c4\"`";
 
   private static final long SLEEP_NS = 1_000_000_000L;
   private static final long POLL_TIMEOUT_MS = 10_000L;
@@ -226,10 +233,109 @@ public class SubscriptionSessionExample {
     }
   }
 
+  /** multi push consumer subscribe topic with tsfile format and query mode */
+  private static void dataSubscription3() throws Exception {
+    try (final SubscriptionSession subscriptionSession = new 
SubscriptionSession(HOST, PORT)) {
+      subscriptionSession.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+      subscriptionSession.createTopic(TOPIC_3, config);
+    }
+
+    final List<Thread> threads = new ArrayList<>();
+    for (int i = 0; i < PARALLELISM; ++i) {
+      final int idx = i;
+      final Thread thread =
+          new Thread(
+              () -> {
+                // Subscription: builder-style ctor
+                try (final SubscriptionPushConsumer consumer3 =
+                    new SubscriptionPushConsumer.Builder()
+                        .consumerId("c" + idx)
+                        .consumerGroupId("cg3")
+                        .ackStrategy(AckStrategy.AFTER_CONSUME)
+                        .consumeListener(
+                            message -> {
+                              // do something for SubscriptionTsFileHandler
+                              System.out.println(
+                                  
message.getTsFileHandler().getFile().getAbsolutePath());
+                              return ConsumeResult.SUCCESS;
+                            })
+                        .buildPushConsumer()) {
+                  consumer3.open();
+                  consumer3.subscribe(TOPIC_3);
+                  while (consumer3.hasMoreData()) {
+                    LockSupport.parkNanos(SLEEP_NS); // wait some time
+                  }
+                }
+              });
+      thread.start();
+      threads.add(thread);
+    }
+
+    for (final Thread thread : threads) {
+      thread.join();
+    }
+  }
+
+  /** multi pull consumer subscribe topic with tsfile format and query mode */
+  private static void dataSubscription4() throws Exception {
+    try (final SubscriptionSession subscriptionSession = new 
SubscriptionSession(HOST, PORT)) {
+      subscriptionSession.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+      subscriptionSession.createTopic(TOPIC_4, config);
+    }
+
+    final List<Thread> threads = new ArrayList<>();
+    for (int i = 0; i < PARALLELISM; ++i) {
+      final int idx = i;
+      final Thread thread =
+          new Thread(
+              () -> {
+                // Subscription: builder-style ctor
+                try (final SubscriptionPullConsumer consumer4 =
+                    new SubscriptionPullConsumer.Builder()
+                        .consumerId("c" + idx)
+                        .consumerGroupId("cg4")
+                        .autoCommit(true)
+                        .fileSaveFsync(true)
+                        .buildPullConsumer()) {
+                  consumer4.open();
+                  consumer4.subscribe(TOPIC_4);
+                  while (true) {
+                    LockSupport.parkNanos(SLEEP_NS); // wait some time
+                    if (!consumer4.hasMoreData()) {
+                      break;
+                    }
+                    for (final SubscriptionMessage message : 
consumer4.poll(POLL_TIMEOUT_MS)) {
+                      final SubscriptionTsFileHandler handler = 
message.getTsFileHandler();
+                      handler.moveFile(
+                          Paths.get(System.getProperty("user.dir"), 
"new-subscription-dir")
+                              .resolve(handler.getPath().getFileName()));
+                    }
+                  }
+                } catch (final IOException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+      thread.start();
+      threads.add(thread);
+    }
+
+    for (final Thread thread : threads) {
+      thread.join();
+    }
+  }
+
   public static void main(final String[] args) throws Exception {
     prepareData();
     // dataQuery();
     // dataSubscription1();
-    dataSubscription2();
+    // dataSubscription2();
+    // dataSubscription3();
+    dataSubscription4();
   }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 95fed66d08b..095562db5a5 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -438,6 +438,32 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+      int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+    setProperty(
+        "pipe_heartbeat_interval_seconds_for_collecting_pipe_meta",
+        String.valueOf(pipeHeartbeatIntervalSecondsForCollectingPipeMeta));
+    return this;
+  }
+
+  @Override
+  public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
+      long pipeMetaSyncerInitialSyncDelayMinutes) {
+    setProperty(
+        "pipe_meta_syncer_initial_sync_delay_minutes",
+        String.valueOf(pipeMetaSyncerInitialSyncDelayMinutes));
+    return this;
+  }
+
+  @Override
+  public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long 
pipeMetaSyncerSyncIntervalMinutes) {
+    setProperty(
+        "pipe_meta_syncer_sync_interval_minutes",
+        String.valueOf(pipeMetaSyncerSyncIntervalMinutes));
+    return this;
+  }
+
   // For part of the log directory
   public String getClusterConfigStr() {
     return 
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 1851590dd41..7724bcf09c0 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -445,4 +445,29 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     cnConfig.setCnConnectionTimeoutMs(connectionTimeoutMs);
     return this;
   }
+
+  @Override
+  public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+      int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+    dnConfig.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+        pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
+    cnConfig.setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+        pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
+    return this;
+  }
+
+  @Override
+  public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
+      long pipeMetaSyncerInitialSyncDelayMinutes) {
+    
dnConfig.setPipeMetaSyncerInitialSyncDelayMinutes(pipeMetaSyncerInitialSyncDelayMinutes);
+    
cnConfig.setPipeMetaSyncerInitialSyncDelayMinutes(pipeMetaSyncerInitialSyncDelayMinutes);
+    return this;
+  }
+
+  @Override
+  public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long 
pipeMetaSyncerSyncIntervalMinutes) {
+    
dnConfig.setPipeMetaSyncerSyncIntervalMinutes(pipeMetaSyncerSyncIntervalMinutes);
+    
cnConfig.setPipeMetaSyncerSyncIntervalMinutes(pipeMetaSyncerSyncIntervalMinutes);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 0ac82fc8713..36675e64686 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -313,4 +313,21 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs) {
     return this;
   }
+
+  @Override
+  public CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+      int pipeHeartbeatIntervalSecondsForCollectingPipeMeta) {
+    return this;
+  }
+
+  @Override
+  public CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(
+      long pipeMetaSyncerInitialSyncDelayMinutes) {
+    return this;
+  }
+
+  @Override
+  public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long 
pipeMetaSyncerSyncIntervalMinutes) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index af47c8254e5..c06c45ddcd4 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -139,4 +139,11 @@ public interface CommonConfig {
   CommonConfig setTagAttributeTotalSize(int tagAttributeTotalSize);
 
   CommonConfig setCnConnectionTimeoutMs(int connectionTimeoutMs);
+
+  CommonConfig setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(
+      int pipeHeartbeatIntervalSecondsForCollectingPipeMeta);
+
+  CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(long 
pipeMetaSyncerInitialSyncDelayMinutes);
+
+  CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long 
pipeMetaSyncerSyncIntervalMinutes);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index ac7e8333984..19c033af151 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -20,12 +20,15 @@
 package org.apache.iotdb.subscription.it.dual;
 
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2Subscription;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeCriticalException;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
@@ -33,6 +36,10 @@ import 
org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
 
+import org.apache.tsfile.read.TsFileReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.read.expression.QueryExpression;
+import org.apache.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.tsfile.write.record.Tablet;
 import org.junit.Assert;
 import org.junit.Test;
@@ -44,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import java.sql.Connection;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -63,6 +71,19 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBSubscriptionTopicIT.class);
 
+  @Override
+  protected void setUpConfig() {
+    super.setUpConfig();
+
+    // Shorten heartbeat and sync interval to avoid timeout of query mode test
+    senderEnv
+        .getConfig()
+        .getCommonConfig()
+        .setPipeHeartbeatIntervalSecondsForCollectingPipeMeta(30);
+    
senderEnv.getConfig().getCommonConfig().setPipeMetaSyncerInitialSyncDelayMinutes(1);
+    
senderEnv.getConfig().getCommonConfig().setPipeMetaSyncerSyncIntervalMinutes(1);
+  }
+
   @Test
   public void testTopicPathSubscription() throws Exception {
     // Insert some historical data on sender
@@ -529,6 +550,108 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
     testTopicInvalidRuntimeConfigTemplate("topic10", config);
   }
 
+  @Test
+  public void testTopicWithQueryMode() throws Exception {
+    // Insert some historical data
+    try (final ISession session = senderEnv.getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic11";
+    final String host = senderEnv.getIP();
+    final int port = Integer.parseInt(senderEnv.getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Properties config = new Properties();
+      config.put(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE);
+      config.put(TopicConstant.MODE_KEY, TopicConstant.MODE_QUERY_VALUE);
+      session.createTopic(topicName, config);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final AtomicInteger rowCount = new AtomicInteger();
+    final AtomicBoolean isClosed = new AtomicBoolean(false);
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                  new SubscriptionPullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .autoCommit(false)
+                      .buildPullConsumer()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+                while (!isClosed.get()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  final List<SubscriptionMessage> messages =
+                      
consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS);
+                  for (final SubscriptionMessage message : messages) {
+                    try (final TsFileReader tsFileReader =
+                        message.getTsFileHandler().openReader()) {
+                      final Path path = new Path("root.db.d1", "s1", true);
+                      final QueryDataSet dataSet =
+                          tsFileReader.query(
+                              
QueryExpression.create(Collections.singletonList(path), null));
+                      while (dataSet.hasNext()) {
+                        dataSet.next();
+                        rowCount.addAndGet(1);
+                      }
+                    }
+                  }
+                  consumer.commitSync(messages);
+                }
+                // Exiting the loop represents passing the awaitility test, at 
this point the result
+                // of 'show subscription' is empty, so there is no need to 
explicitly unsubscribe.
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getMethodName()));
+    thread.start();
+
+    try {
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () -> {
+            // Check row count
+            Assert.assertEquals(100, rowCount.get());
+            // Check empty subscription
+            try (final SyncConfigNodeIServiceClient client =
+                (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+              final TShowSubscriptionResp showSubscriptionResp =
+                  client.showSubscription(new TShowSubscriptionReq());
+              Assert.assertEquals(
+                  RpcUtils.SUCCESS_STATUS.getCode(), 
showSubscriptionResp.status.getCode());
+              Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+              Assert.assertEquals(0, 
showSubscriptionResp.subscriptionInfoList.size());
+            }
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      isClosed.set(true);
+      thread.join();
+    }
+  }
+
   private void testTopicInvalidRuntimeConfigTemplate(
       final String topicName, final Properties config) throws Exception {
     // Create topic
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
index fe62ba622a9..c3b5f24622c 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConfig.java
@@ -40,6 +40,24 @@ public class TopicConfig extends PipeParameters {
     super(attributes);
   }
 
+  private static final Map<String, String> LOOSE_RANGE_TIME_CONFIG =
+      new HashMap<String, String>() {
+        {
+          put("history.loose-range", "time");
+          put("realtime.loose-range", "time");
+        }
+      };
+
+  private static final Map<String, String> REALTIME_BATCH_MODE_CONFIG =
+      Collections.singletonMap("realtime.mode", "batch");
+  private static final Map<String, String> REALTIME_STREAM_MODE_CONFIG =
+      Collections.singletonMap("realtime.mode", "stream");
+
+  private static final Map<String, String> QUERY_MODE_CONFIG =
+      Collections.singletonMap("mode", "query");
+  private static final Map<String, String> SUBSCRIBE_MODE_CONFIG =
+      Collections.singletonMap("mode", "subscribe");
+
   /////////////////////////////// de/ser ///////////////////////////////
 
   public void serialize(final DataOutputStream stream) throws IOException {
@@ -87,8 +105,7 @@ public class TopicConfig extends PipeParameters {
     // enable loose range when using tsfile format
     if (TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
         attributes.getOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE))) {
-      attributesWithTimeRange.put("history.loose-range", "time");
-      attributesWithTimeRange.put("realtime.loose-range", "time");
+      attributesWithTimeRange.putAll(LOOSE_RANGE_TIME_CONFIG);
     }
 
     return attributesWithTimeRange;
@@ -97,8 +114,15 @@ public class TopicConfig extends PipeParameters {
   public Map<String, String> getAttributesWithRealtimeMode() {
     return TopicConstant.FORMAT_TS_FILE_HANDLER_VALUE.equals(
             attributes.getOrDefault(TopicConstant.FORMAT_KEY, 
TopicConstant.FORMAT_DEFAULT_VALUE))
-        ? Collections.singletonMap("realtime.mode", "batch")
-        : Collections.singletonMap("realtime.mode", "hybrid");
+        ? REALTIME_BATCH_MODE_CONFIG
+        : REALTIME_STREAM_MODE_CONFIG;
+  }
+
+  public Map<String, String> getAttributesWithSourceMode() {
+    return TopicConstant.MODE_QUERY_VALUE.equals(
+            attributes.getOrDefault(TopicConstant.MODE_KEY, 
TopicConstant.MODE_DEFAULT_VALUE))
+        ? QUERY_MODE_CONFIG
+        : SUBSCRIBE_MODE_CONFIG;
   }
 
   public Map<String, String> getAttributesWithProcessorPrefix() {
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
index bd44cf1bcc9..a6e8ec901bf 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/TopicConstant.java
@@ -30,6 +30,11 @@ public class TopicConstant {
   public static final String END_TIME_KEY = "end-time";
   public static final String NOW_TIME_VALUE = "now";
 
+  public static final String MODE_KEY = "mode";
+  public static final String MODE_SUBSCRIBE_VALUE = "subscribe";
+  public static final String MODE_QUERY_VALUE = "query";
+  public static final String MODE_DEFAULT_VALUE = MODE_SUBSCRIBE_VALUE;
+
   public static final String FORMAT_KEY = "format";
   public static final String FORMAT_SESSION_DATA_SETS_HANDLER_VALUE = 
"SessionDataSetsHandler";
   public static final String FORMAT_TS_FILE_HANDLER_VALUE = "TsFileHandler";
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
index d386903f124..01d173d2742 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponse.java
@@ -95,6 +95,9 @@ public class SubscriptionPollResponse {
         case ERROR:
           payload = new ErrorPayload().deserialize(buffer);
           break;
+        case TERMINATION:
+          payload = new TerminationPayload().deserialize(buffer);
+          break;
         default:
           LOGGER.warn("unexpected response type: {}, payload will be null", 
responseType);
           break;
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
index a22c9590c65..b27791b36c5 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/SubscriptionPollResponseType.java
@@ -31,6 +31,8 @@ public enum SubscriptionPollResponseType {
   FILE_INIT((short) 2),
   FILE_PIECE((short) 3),
   FILE_SEAL((short) 4),
+
+  TERMINATION((short) 5),
   ;
 
   private final short type;
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TerminationPayload.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TerminationPayload.java
new file mode 100644
index 00000000000..c2da3f92e6d
--- /dev/null
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/poll/TerminationPayload.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.rpc.subscription.payload.poll;
+
+import java.io.DataOutputStream;
+import java.nio.ByteBuffer;
+
+public class TerminationPayload implements SubscriptionPollPayload {
+
+  @Override
+  public void serialize(final DataOutputStream stream) {
+    // do nothing
+  }
+
+  @Override
+  public SubscriptionPollPayload deserialize(final ByteBuffer buffer) {
+    // do nothing
+    return null;
+  }
+
+  @Override
+  public String toString() {
+    return "TerminationPayload";
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index a6b418f0220..410348c991b 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -108,8 +108,12 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     return consumerGroupId;
   }
 
-  public Set<String> getSubscribedTopicNames() {
-    return subscribedTopicNames;
+  /**
+   * @return When <b>only</b> subscribing to the query mode topics, if there 
is no new data to
+   *     process, return {@code false}; otherwise, return {@code true}.
+   */
+  public boolean hasMoreData() {
+    return !subscribedTopicNames.isEmpty();
   }
 
   /////////////////////////////// ctor ///////////////////////////////
@@ -393,6 +397,16 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
               } else {
                 throw new 
SubscriptionRuntimeNonCriticalException(errorMessage);
               }
+            case TERMINATION:
+              final SubscriptionCommitContext commitContext = 
pollResponse.getCommitContext();
+              final String topicNameToUnsubscribe = 
commitContext.getTopicName();
+              LOGGER.info(
+                  "Termination occurred when SubscriptionConsumer {} polling 
topics {}, unsubscribe topic {} automatically",
+                  this,
+                  topicNames,
+                  topicNameToUnsubscribe);
+              unsubscribe(Collections.singleton(topicNameToUnsubscribe), 
false);
+              break;
             default:
               LOGGER.warn("unexpected response type: {}", responseType);
               break;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 8710eec8522..0894feb62ad 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -229,6 +229,7 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
   protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) {
     LOGGER.info("CreateSubscriptionProcedure: 
rollbackFromOperateOnConfigNodes");
 
+    // TODO: roll back from the last executed procedure to the first executed
     alterConsumerGroupProcedure.rollbackFromOperateOnConfigNodes(env);
 
     TSStatus response;
@@ -283,6 +284,7 @@ public class CreateSubscriptionProcedure extends 
AbstractOperateSubscriptionAndP
   protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
     LOGGER.info("CreateSubscriptionProcedure: rollbackFromOperateOnDataNodes");
 
+    // TODO: roll back from the last executed procedure to the first executed
     alterConsumerGroupProcedure.rollbackFromOperateOnDataNodes(env);
 
     // Push all topic metas to datanode, may be time-consuming
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
index efb5aa22827..89da284ac0d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
@@ -62,9 +62,10 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
 
   private TUnsubscribeReq unsubscribeReq;
 
-  private AlterConsumerGroupProcedure alterConsumerGroupProcedure;
-  private List<AlterTopicProcedure> alterTopicProcedures = new ArrayList<>();
+  // NOTE: The 'drop pipe' operation should be performed before 'alter 
consumer group'.
   private List<DropPipeProcedureV2> dropPipeProcedures = new ArrayList<>();
+  private List<AlterTopicProcedure> alterTopicProcedures = new ArrayList<>();
+  private AlterConsumerGroupProcedure alterConsumerGroupProcedure;
 
   // Record failed index of procedures to rollback properly.
   // We only record fail index when executing on config nodes, because when 
executing on data nodes
@@ -76,7 +77,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
     super();
   }
 
-  public DropSubscriptionProcedure(TUnsubscribeReq unsubscribeReq) {
+  public DropSubscriptionProcedure(final TUnsubscribeReq unsubscribeReq) {
     this.unsubscribeReq = unsubscribeReq;
   }
 
@@ -86,27 +87,28 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
   }
 
   @Override
-  protected void executeFromValidate(ConfigNodeProcedureEnv env) throws 
SubscriptionException {
+  protected void executeFromValidate(final ConfigNodeProcedureEnv env)
+      throws SubscriptionException {
     LOGGER.info("DropSubscriptionProcedure: executeFromValidate");
 
     subscriptionInfo.get().validateBeforeUnsubscribe(unsubscribeReq);
 
     // Construct AlterConsumerGroupProcedure
-    ConsumerGroupMeta updatedConsumerGroupMeta =
+    final ConsumerGroupMeta updatedConsumerGroupMeta =
         
subscriptionInfo.get().deepCopyConsumerGroupMeta(unsubscribeReq.getConsumerGroupId());
 
     // Get topics subscribed by no consumers in this group after this 
un-subscription
-    Set<String> topicsUnsubByGroup =
+    final Set<String> topicsUnsubByGroup =
         updatedConsumerGroupMeta.removeSubscription(
             unsubscribeReq.getConsumerId(), unsubscribeReq.getTopicNames());
     alterConsumerGroupProcedure =
         new AlterConsumerGroupProcedure(updatedConsumerGroupMeta, 
subscriptionInfo);
 
-    for (String topic : unsubscribeReq.getTopicNames()) {
+    for (final String topic : unsubscribeReq.getTopicNames()) {
       if (topicsUnsubByGroup.contains(topic)) {
         // Topic will be subscribed by no consumers in this group
 
-        TopicMeta updatedTopicMeta = 
subscriptionInfo.get().deepCopyTopicMeta(topic);
+        final TopicMeta updatedTopicMeta = 
subscriptionInfo.get().deepCopyTopicMeta(topic);
         
updatedTopicMeta.removeSubscribedConsumerGroup(unsubscribeReq.getConsumerGroupId());
 
         alterTopicProcedures.add(new AlterTopicProcedure(updatedTopicMeta, 
subscriptionInfo));
@@ -118,38 +120,39 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
       }
     }
 
-    alterConsumerGroupProcedure.executeFromValidate(env);
+    // Validate DropPipeProcedureV2s
+    for (final DropPipeProcedureV2 dropPipeProcedure : dropPipeProcedures) {
+      dropPipeProcedure.executeFromValidateTask(env);
+      dropPipeProcedure.executeFromCalculateInfoForTask(env);
+    }
 
-    for (AlterTopicProcedure alterTopicProcedure : alterTopicProcedures) {
+    // Validate AlterTopicProcedures
+    for (final AlterTopicProcedure alterTopicProcedure : alterTopicProcedures) 
{
       alterTopicProcedure.executeFromValidate(env);
     }
 
-    for (DropPipeProcedureV2 dropPipeProcedure : dropPipeProcedures) {
-      dropPipeProcedure.executeFromValidateTask(env);
-      dropPipeProcedure.executeFromCalculateInfoForTask(env);
-    }
+    // Validate AlterConsumerGroupProcedure
+    alterConsumerGroupProcedure.executeFromValidate(env);
   }
 
   @Override
-  protected void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env)
+  protected void executeFromOperateOnConfigNodes(final ConfigNodeProcedureEnv 
env)
       throws SubscriptionException {
     LOGGER.info("DropSubscriptionProcedure: executeFromOperateOnConfigNodes");
 
-    alterConsumerGroupProcedure.executeFromOperateOnConfigNodes(env);
-
     TSStatus response;
 
-    List<AlterTopicPlan> alterTopicPlans =
-        alterTopicProcedures.stream()
-            .map(AlterTopicProcedure::getUpdatedTopicMeta)
-            .map(AlterTopicPlan::new)
+    // Execute DropPipeProcedureV2s
+    final List<ConfigPhysicalPlan> dropPipePlans =
+        dropPipeProcedures.stream()
+            .map(proc -> new DropPipePlanV2(proc.getPipeName()))
             .collect(Collectors.toList());
     try {
       response =
           env.getConfigManager()
               .getConsensusManager()
-              .write(new AlterMultipleTopicsPlan(alterTopicPlans));
-    } catch (ConsensusException e) {
+              .write(new OperateMultiplePipesPlanV2(dropPipePlans));
+    } catch (final ConsensusException e) {
       LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
@@ -157,19 +160,21 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
     if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
         && response.getSubStatusSize() > 0) {
       // Record the failed index for rollback
-      alterTopicProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
+      dropPipeProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
     }
 
-    List<ConfigPhysicalPlan> dropPipePlans =
-        dropPipeProcedures.stream()
-            .map(proc -> new DropPipePlanV2(proc.getPipeName()))
+    // Execute AlterTopicProcedures
+    final List<AlterTopicPlan> alterTopicPlans =
+        alterTopicProcedures.stream()
+            .map(AlterTopicProcedure::getUpdatedTopicMeta)
+            .map(AlterTopicPlan::new)
             .collect(Collectors.toList());
     try {
       response =
           env.getConfigManager()
               .getConsensusManager()
-              .write(new OperateMultiplePipesPlanV2(dropPipePlans));
-    } catch (ConsensusException e) {
+              .write(new AlterMultipleTopicsPlan(alterTopicPlans));
+    } catch (final ConsensusException e) {
       LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
@@ -177,34 +182,24 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
     if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
         && response.getSubStatusSize() > 0) {
       // Record the failed index for rollback
-      dropPipeProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
+      alterTopicProcedureFailIndexOnCN = response.getSubStatusSize() - 1;
     }
+
+    // Execute AlterConsumerGroupProcedure
+    alterConsumerGroupProcedure.executeFromOperateOnConfigNodes(env);
   }
 
   @Override
-  protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
+  protected void executeFromOperateOnDataNodes(final ConfigNodeProcedureEnv 
env)
       throws SubscriptionException, IOException {
     LOGGER.info("DropSubscriptionProcedure: executeFromOperateOnDataNodes");
 
-    alterConsumerGroupProcedure.executeFromOperateOnDataNodes(env);
-
-    // push topic meta to data nodes
-    List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
-    for (AlterTopicProcedure alterTopicProcedure : alterTopicProcedures) {
-      
topicMetaBinaryList.add(alterTopicProcedure.getUpdatedTopicMeta().serialize());
-    }
-    if 
(pushTopicMetaHasException(env.pushMultiTopicMetaToDataNodes(topicMetaBinaryList)))
 {
-      // If not all topic meta are pushed successfully, the meta can be pushed 
during meta sync.
-      LOGGER.warn(
-          "Failed to alter topics when creating subscription, metadata will be 
synchronized later.");
-    }
-
-    // push pipe meta to data nodes
-    List<String> pipeNames =
+    // Push pipe meta to data nodes
+    final List<String> pipeNames =
         dropPipeProcedures.stream()
             .map(DropPipeProcedureV2::getPipeName)
             .collect(Collectors.toList());
-    String exceptionMessage =
+    final String exceptionMessage =
         AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(
             null, dropMultiPipeOnDataNodes(pipeNames, env));
     if (!exceptionMessage.isEmpty()) {
@@ -214,23 +209,37 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
           pipeNames,
           exceptionMessage);
     }
+
+    // Push topic meta to data nodes
+    final List<ByteBuffer> topicMetaBinaryList = new ArrayList<>();
+    for (final AlterTopicProcedure alterTopicProcedure : alterTopicProcedures) 
{
+      
topicMetaBinaryList.add(alterTopicProcedure.getUpdatedTopicMeta().serialize());
+    }
+    if 
(pushTopicMetaHasException(env.pushMultiTopicMetaToDataNodes(topicMetaBinaryList)))
 {
+      // If not all topic meta are pushed successfully, the meta can be pushed 
during meta sync.
+      LOGGER.warn(
+          "Failed to alter topics when creating subscription, metadata will be 
synchronized later.");
+    }
+
+    // Push consumer group meta to data nodes
+    alterConsumerGroupProcedure.executeFromOperateOnDataNodes(env);
   }
 
   @Override
-  protected void rollbackFromValidate(ConfigNodeProcedureEnv env) {
+  protected void rollbackFromValidate(final ConfigNodeProcedureEnv env) {
     LOGGER.info("DropSubscriptionProcedure: rollbackFromLock");
   }
 
   @Override
-  protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) {
+  protected void rollbackFromOperateOnConfigNodes(final ConfigNodeProcedureEnv 
env) {
     LOGGER.info("DropSubscriptionProcedure: rollbackFromOperateOnConfigNodes");
 
+    // Rollback AlterConsumerGroupProcedure
     alterConsumerGroupProcedure.rollbackFromOperateOnConfigNodes(env);
 
+    // Rollback AlterTopicProcedures
     TSStatus response;
-
-    // rollback alterTopicProcedures
-    List<AlterTopicPlan> alterTopicRollbackPlans = new ArrayList<>();
+    final List<AlterTopicPlan> alterTopicRollbackPlans = new ArrayList<>();
     for (int i = 0;
         i <= Math.min(alterTopicProcedureFailIndexOnCN, 
alterTopicProcedures.size());
         i++) {
@@ -242,23 +251,25 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
           env.getConfigManager()
               .getConsensusManager()
               .write(new AlterMultipleTopicsPlan(alterTopicRollbackPlans));
-    } catch (ConsensusException e) {
+    } catch (final ConsensusException e) {
       LOGGER.warn("Failed in the write API executing the consensus layer due 
to: ", e);
       response = new 
TSStatus(TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode());
       response.setMessage(e.getMessage());
     }
-    // if failed to rollback, throw exception
+    // If failed to rollback, throw exception
     if (response.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       throw new SubscriptionException(response.getMessage());
     }
 
-    // Do nothing to rollback dropPipeProcedures
+    // Do nothing to rollback DropPipeProcedureV2s
   }
 
   @Override
-  protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) 
throws IOException {
+  protected void rollbackFromOperateOnDataNodes(final ConfigNodeProcedureEnv 
env)
+      throws IOException {
     LOGGER.info("DropSubscriptionProcedure: rollbackFromOperateOnDataNodes");
 
+    // Rollback AlterConsumerGroupProcedure
     alterConsumerGroupProcedure.rollbackFromOperateOnDataNodes(env);
 
     // Push all topic metas to datanode, may be time-consuming
@@ -268,7 +279,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
     }
 
     // Push all pipe metas to datanode, may be time-consuming
-    String exceptionMessage =
+    final String exceptionMessage =
         AbstractOperatePipeProcedureV2.parsePushPipeMetaExceptionForPipe(
             null, AbstractOperatePipeProcedureV2.pushPipeMetaToDataNodes(env, 
pipeTaskInfo));
     if (!exceptionMessage.isEmpty()) {
@@ -279,7 +290,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
   }
 
   @Override
-  public void serialize(DataOutputStream stream) throws IOException {
+  public void serialize(final DataOutputStream stream) throws IOException {
     stream.writeShort(ProcedureType.DROP_SUBSCRIPTION_PROCEDURE.getTypeCode());
 
     super.serialize(stream);
@@ -289,7 +300,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
     final int size = unsubscribeReq.getTopicNamesSize();
     ReadWriteIOUtils.write(size, stream);
     if (size != 0) {
-      for (String topicName : unsubscribeReq.getTopicNames()) {
+      for (final String topicName : unsubscribeReq.getTopicNames()) {
         ReadWriteIOUtils.write(topicName, stream);
       }
     }
@@ -306,7 +317,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
     if (alterTopicProcedures != null) {
       ReadWriteIOUtils.write(true, stream);
       ReadWriteIOUtils.write(alterTopicProcedures.size(), stream);
-      for (AlterTopicProcedure topicProcedure : alterTopicProcedures) {
+      for (final AlterTopicProcedure topicProcedure : alterTopicProcedures) {
         topicProcedure.serialize(stream);
       }
     } else {
@@ -317,7 +328,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
     if (dropPipeProcedures != null) {
       ReadWriteIOUtils.write(true, stream);
       ReadWriteIOUtils.write(dropPipeProcedures.size(), stream);
-      for (AbstractOperatePipeProcedureV2 pipeProcedure : dropPipeProcedures) {
+      for (final AbstractOperatePipeProcedureV2 pipeProcedure : 
dropPipeProcedures) {
         pipeProcedure.serialize(stream);
       }
     } else {
@@ -326,7 +337,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
   }
 
   @Override
-  public void deserialize(ByteBuffer byteBuffer) {
+  public void deserialize(final ByteBuffer byteBuffer) {
     super.deserialize(byteBuffer);
 
     unsubscribeReq =
@@ -355,7 +366,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
         // This readShort should return ALTER_TOPIC_PROCEDURE, and we ignore 
it.
         ReadWriteIOUtils.readShort(byteBuffer);
 
-        AlterTopicProcedure topicProcedure = new AlterTopicProcedure();
+        final AlterTopicProcedure topicProcedure = new AlterTopicProcedure();
         topicProcedure.deserialize(byteBuffer);
         alterTopicProcedures.add(topicProcedure);
       }
@@ -366,9 +377,9 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
       size = ReadWriteIOUtils.readInt(byteBuffer);
       for (int i = 0; i < size; ++i) {
         // This readShort should return DROP_PIPE_PROCEDURE.
-        short typeCode = ReadWriteIOUtils.readShort(byteBuffer);
+        final short typeCode = ReadWriteIOUtils.readShort(byteBuffer);
         if (typeCode == ProcedureType.DROP_PIPE_PROCEDURE_V2.getTypeCode()) {
-          DropPipeProcedureV2 dropPipeProcedureV2 = new DropPipeProcedureV2();
+          final DropPipeProcedureV2 dropPipeProcedureV2 = new 
DropPipeProcedureV2();
           dropPipeProcedureV2.deserialize(byteBuffer);
           dropPipeProcedures.add(dropPipeProcedureV2);
         }
@@ -377,14 +388,14 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
   }
 
   @Override
-  public boolean equals(Object o) {
+  public boolean equals(final Object o) {
     if (this == o) {
       return true;
     }
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-    DropSubscriptionProcedure that = (DropSubscriptionProcedure) o;
+    final DropSubscriptionProcedure that = (DropSubscriptionProcedure) o;
     return Objects.equals(getProcId(), that.getProcId())
         && Objects.equals(getCurrentState(), that.getCurrentState())
         && getCycles() == that.getCycles()
@@ -408,7 +419,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
 
   @TestOnly
   public void setAlterConsumerGroupProcedure(
-      AlterConsumerGroupProcedure alterConsumerGroupProcedure) {
+      final AlterConsumerGroupProcedure alterConsumerGroupProcedure) {
     this.alterConsumerGroupProcedure = alterConsumerGroupProcedure;
   }
 
@@ -418,7 +429,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
   }
 
   @TestOnly
-  public void setAlterTopicProcedures(List<AlterTopicProcedure> 
alterTopicProcedures) {
+  public void setAlterTopicProcedures(final List<AlterTopicProcedure> 
alterTopicProcedures) {
     this.alterTopicProcedures = alterTopicProcedures;
   }
 
@@ -428,7 +439,7 @@ public class DropSubscriptionProcedure extends 
AbstractOperateSubscriptionAndPip
   }
 
   @TestOnly
-  public void setDropPipeProcedures(List<DropPipeProcedureV2> 
dropPipeProcedures) {
+  public void setDropPipeProcedures(final List<DropPipeProcedureV2> 
dropPipeProcedures) {
     this.dropPipeProcedures = dropPipeProcedures;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
index 37c04e26f50..e50f1edcc69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java
@@ -238,16 +238,22 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
   }
 
   @Override
-  protected void dropPipe(final String pipeName, final long creationTime) {
-    super.dropPipe(pipeName, creationTime);
+  protected boolean dropPipe(final String pipeName, final long creationTime) {
+    if (!super.dropPipe(pipeName, creationTime)) {
+      return false;
+    }
 
     PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
         .deregister(pipeName + "_" + creationTime);
+
+    return true;
   }
 
   @Override
-  protected void dropPipe(final String pipeName) {
-    super.dropPipe(pipeName);
+  protected boolean dropPipe(final String pipeName) {
+    if (!super.dropPipe(pipeName)) {
+      return false;
+    }
 
     final PipeMeta pipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
     if (Objects.nonNull(pipeMeta)) {
@@ -255,6 +261,8 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
       PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
           .deregister(pipeName + "_" + creationTime);
     }
+
+    return true;
   }
 
   public void stopAllPipesWithCriticalException() {
@@ -612,7 +620,7 @@ public class PipeDataNodeTaskAgent extends PipeTaskAgent {
 
   ///////////////////////// Pipe Consensus /////////////////////////
 
-  public ProgressIndex getPipeTaskProgressIndex(String pipeName, int 
consensusGroupId) {
+  public ProgressIndex getPipeTaskProgressIndex(final String pipeName, final 
int consensusGroupId) {
     if (!tryReadLockWithTimeOut(10)) {
       throw new PipeException(
           String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index 5284d295863..983abb03642 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -114,6 +114,13 @@ public class SubscriptionConsumerAgent {
       return;
     }
 
+    // unbind and remove prefetching queue
+    final Set<String> topicsUnsubByGroup =
+        ConsumerGroupMeta.getTopicsUnsubByGroup(metaInAgent, 
metaFromCoordinator);
+    for (final String topicName : topicsUnsubByGroup) {
+      SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, 
topicName, true);
+    }
+
     // TODO: Currently we fully replace the entire ConsumerGroupMeta without 
carefully checking the
     // changes in its fields.
     consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
index 45145bd2b67..bc174170422 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java
@@ -149,4 +149,16 @@ public class SubscriptionTopicAgent {
       releaseReadLock();
     }
   }
+
+  public String getTopicMode(final String topicName) {
+    acquireReadLock();
+    try {
+      return topicMetaKeeper
+          .getTopicMeta(topicName)
+          .getConfig()
+          .getStringOrDefault(TopicConstant.MODE_KEY, 
TopicConstant.MODE_DEFAULT_VALUE);
+    } finally {
+      releaseReadLock();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index d26ea8b4458..f3bd0371fdd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -65,6 +65,14 @@ public class SubscriptionBroker {
       final String topicName = entry.getKey();
       final SubscriptionPrefetchingQueue prefetchingQueue = entry.getValue();
       if (topicNames.contains(topicName)) {
+        // before determining if it is closed
+        if (prefetchingQueue.isCompleted()) {
+          LOGGER.info(
+              "Subscription: prefetching queue bound to topic [{}] is 
completed, return termination response to client",
+              topicName);
+          
events.add(prefetchingQueue.generateSubscriptionPollTerminationResponse());
+          continue;
+        }
         if (prefetchingQueue.isClosed()) {
           LOGGER.warn("Subscription: prefetching queue bound to topic [{}] is 
closed", topicName);
           continue;
@@ -181,6 +189,11 @@ public class SubscriptionBroker {
     // clean up events in prefetching queue, this operation is idempotent
     prefetchingQueue.cleanup();
 
+    // mark prefetching queue completed only for topic of query mode
+    if 
(SubscriptionAgent.topic().getTopicMode(topicName).equals(TopicConstant.MODE_QUERY_VALUE))
 {
+      prefetchingQueue.markCompleted();
+    }
+
     if (doRemove) {
       topicNameToPrefetchingQueue.remove(topicName);
       SubscriptionPrefetchingQueueMetrics.getInstance()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
index 9dec709230f..d93f64e2f18 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java
@@ -25,11 +25,16 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
 import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
 import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
+import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponse;
+import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionPollResponseType;
+import org.apache.iotdb.rpc.subscription.payload.poll.TerminationPayload;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -124,7 +129,7 @@ public abstract class SubscriptionPrefetchingQueue {
         subscriptionCommitIdGenerator.getAndIncrement());
   }
 
-  protected SubscriptionCommitContext 
generateInvalidSubscriptionCommitContext() {
+  private SubscriptionCommitContext generateInvalidSubscriptionCommitContext() 
{
     return new SubscriptionCommitContext(
         IoTDBDescriptor.getInstance().getConfig().getDataNodeId(),
         PipeAgent.runtime().getRebootTimes(),
@@ -176,4 +181,23 @@ public abstract class SubscriptionPrefetchingQueue {
   public void markCompleted() {
     isCompleted = true;
   }
+
+  public SubscriptionEvent generateSubscriptionPollTerminationResponse() {
+    return new SubscriptionEvent(
+        Collections.emptyList(),
+        new SubscriptionPollResponse(
+            SubscriptionPollResponseType.TERMINATION.getType(),
+            new TerminationPayload(),
+            generateInvalidSubscriptionCommitContext()));
+  }
+
+  public SubscriptionEvent generateSubscriptionPollErrorResponse(
+      final String errorMessage, final boolean critical) {
+    return new SubscriptionEvent(
+        Collections.emptyList(),
+        new SubscriptionPollResponse(
+            SubscriptionPollResponseType.ERROR.getType(),
+            new ErrorPayload(errorMessage, critical),
+            generateInvalidSubscriptionCommitContext()));
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
index 14d3315e1b7..935494537b3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTabletsQueue.java
@@ -25,6 +25,7 @@ import 
org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
 import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -138,6 +139,17 @@ public class SubscriptionPrefetchingTabletsQueue extends 
SubscriptionPrefetching
         continue;
       }
 
+      if (event instanceof PipeTerminateEvent) {
+        LOGGER.info(
+            "Subscription: SubscriptionPrefetchingTabletsQueue {} commit 
PipeTerminateEvent {}",
+            this,
+            event);
+        // commit directly
+        ((PipeTerminateEvent) event)
+            
.decreaseReferenceCount(SubscriptionPrefetchingTsFileQueue.class.getName(), 
true);
+        continue;
+      }
+
       if (event instanceof TabletInsertionEvent) {
         final List<Tablet> currentTablets = 
convertToTablets((TabletInsertionEvent) event);
         if (currentTablets.isEmpty()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
index 3f2dac42db6..dbb68809681 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingTsFileQueue.java
@@ -22,12 +22,13 @@ package org.apache.iotdb.db.subscription.broker;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import 
org.apache.iotdb.commons.pipe.task.connection.UnboundedBlockingPendingQueue;
 import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
 import org.apache.iotdb.db.subscription.event.SubscriptionEventBinaryCache;
 import org.apache.iotdb.db.subscription.event.SubscriptionTsFileEvent;
 import org.apache.iotdb.pipe.api.event.Event;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
-import org.apache.iotdb.rpc.subscription.payload.poll.ErrorPayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.FileInitPayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.FilePiecePayload;
 import org.apache.iotdb.rpc.subscription.payload.poll.FileSealPayload;
@@ -42,7 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -80,7 +80,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
   }
 
   @Override
-  public SubscriptionTsFileEvent poll(final String consumerId) {
+  public SubscriptionEvent poll(final String consumerId) {
     if (hasUnPollableOnTheFlySubscriptionTsFileEvent(consumerId)) {
       return null;
     }
@@ -102,6 +102,17 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
         continue;
       }
 
+      if (event instanceof PipeTerminateEvent) {
+        LOGGER.info(
+            "Subscription: SubscriptionPrefetchingTsFileQueue {} commit 
PipeTerminateEvent {}",
+            this,
+            event);
+        // commit directly
+        ((PipeTerminateEvent) event)
+            
.decreaseReferenceCount(SubscriptionPrefetchingTsFileQueue.class.getName(), 
true);
+        continue;
+      }
+
       if (event instanceof TabletInsertionEvent) {
         final String errorMessage =
             String.format(
@@ -136,7 +147,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     return null;
   }
 
-  public synchronized @NonNull SubscriptionTsFileEvent pollTsFile(
+  public synchronized @NonNull SubscriptionEvent pollTsFile(
       final String consumerId, final String fileName, final long 
writingOffset) {
     // 1. Extract current event and check it
     final SubscriptionTsFileEvent event = 
consumerIdToCurrentEventMap.get(consumerId);
@@ -281,7 +292,7 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     return pollTsFile(consumerId, writingOffset, event);
   }
 
-  private synchronized @NonNull SubscriptionTsFileEvent pollTsFile(
+  private synchronized @NonNull SubscriptionEvent pollTsFile(
       final String consumerId, final long writingOffset, final 
SubscriptionTsFileEvent event) {
     Pair<SubscriptionTsFileEvent, Boolean> newEventWithCommittable =
         event.matchOrResetNext(writingOffset);
@@ -384,18 +395,8 @@ public class SubscriptionPrefetchingTsFileQueue extends 
SubscriptionPrefetchingQ
     return null;
   }
 
-  private SubscriptionTsFileEvent generateSubscriptionPollErrorResponse(
-      final String errorMessage, final boolean critical) {
-    return new SubscriptionTsFileEvent(
-        Collections.emptyList(),
-        new SubscriptionPollResponse(
-            SubscriptionPollResponseType.ERROR.getType(),
-            new ErrorPayload(errorMessage, critical),
-            super.generateInvalidSubscriptionCommitContext()));
-  }
-
-  private SubscriptionTsFileEvent generateSubscriptionPollErrorResponse(final 
String errorMessage) {
+  private SubscriptionEvent generateSubscriptionPollErrorResponse(final String 
errorMessage) {
     // consider non-critical by default, meaning the client can retry
-    return generateSubscriptionPollErrorResponse(errorMessage, false);
+    return super.generateSubscriptionPollErrorResponse(errorMessage, false);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
index 43827ab85c4..41e690cbce9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskLifeCycle.java
@@ -98,8 +98,11 @@ public class SubscriptionConnectorSubtaskLifeCycle extends 
PipeConnectorSubtaskL
   public synchronized void close() {
     super.close();
 
+    // Here, the prefetching queue is not actually removed, because it's 
uncertain whether the
+    // corresponding underlying pipe is automatically terminated. The actual 
removal is carried out
+    // when dropping the subscription.
     final String consumerGroupId = ((SubscriptionConnectorSubtask) 
subtask).getConsumerGroupId();
     final String topicName = ((SubscriptionConnectorSubtask) 
subtask).getTopicName();
-    SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, 
topicName, true);
+    SubscriptionAgent.broker().unbindPrefetchingQueue(consumerGroupId, 
topicName, false);
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index a1844bef572..65c1a95aaa3 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -103,7 +103,7 @@ public abstract class PipeTaskAgent {
     pipeMetaKeeper.acquireWriteLock();
   }
 
-  protected boolean tryWriteLockWithTimeOut(long timeOutInSeconds) {
+  protected boolean tryWriteLockWithTimeOut(final long timeOutInSeconds) {
     try {
       return pipeMetaKeeper.tryWriteLock(timeOutInSeconds);
     } catch (final InterruptedException e) {
@@ -462,11 +462,14 @@ public abstract class PipeTaskAgent {
   protected abstract Map<Integer, PipeTask> buildPipeTasks(final PipeMeta 
pipeMetaFromCoordinator)
       throws IllegalPathException;
 
-  protected void dropPipe(final String pipeName, final long creationTime) {
+  /**
+   * @return {@code true} if a pipe has indeed been dropped, otherwise {@code 
false}.
+   */
+  protected boolean dropPipe(final String pipeName, final long creationTime) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (!checkBeforeDropPipe(existedPipeMeta, pipeName, creationTime)) {
-      return;
+      return false;
     }
 
     // Mark pipe meta as dropped first. This will help us detect if the pipe 
meta has been dropped
@@ -483,7 +486,7 @@ public abstract class PipeTaskAgent {
               + "Skip dropping.",
           pipeName,
           creationTime);
-      return;
+      return false;
     }
 
     // Trigger drop() method for each pipe task by parallel stream
@@ -496,13 +499,18 @@ public abstract class PipeTaskAgent {
 
     // Remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
+
+    return true;
   }
 
-  protected void dropPipe(final String pipeName) {
+  /**
+   * @return {@code true} if a pipe has indeed been dropped, otherwise {@code 
false}.
+   */
+  protected boolean dropPipe(final String pipeName) {
     final PipeMeta existedPipeMeta = pipeMetaKeeper.getPipeMeta(pipeName);
 
     if (!checkBeforeDropPipe(existedPipeMeta, pipeName)) {
-      return;
+      return false;
     }
 
     // Mark pipe meta as dropped first. This will help us detect if the pipe 
meta has been dropped
@@ -516,7 +524,7 @@ public abstract class PipeTaskAgent {
     if (pipeTasks == null) {
       LOGGER.info(
           "Pipe {} has already been dropped or has not been created. Skip 
dropping.", pipeName);
-      return;
+      return false;
     }
 
     // Trigger drop() method for each pipe task by parallel stream
@@ -529,6 +537,8 @@ public abstract class PipeTaskAgent {
 
     // Remove pipe meta from pipe meta keeper
     pipeMetaKeeper.removePipeMeta(pipeName);
+
+    return true;
   }
 
   private void startPipe(final String pipeName, final long creationTime) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index d53868208bf..764fbfca501 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -48,7 +48,7 @@ public class ConsumerGroupMeta {
   }
 
   public ConsumerGroupMeta(
-      String consumerGroupId, long creationTime, ConsumerMeta 
firstConsumerMeta) {
+      final String consumerGroupId, final long creationTime, final 
ConsumerMeta firstConsumerMeta) {
     this.consumerGroupId = consumerGroupId;
     this.creationTime = creationTime;
     this.topicNameToSubscribedConsumerIdSet = new HashMap<>();
@@ -74,20 +74,48 @@ public class ConsumerGroupMeta {
     return creationTime;
   }
 
+  public static /* @NonNull */ Set<String> getTopicsUnsubByGroup(
+      final ConsumerGroupMeta currentMeta, final ConsumerGroupMeta 
updatedMeta) {
+    if (!Objects.equals(currentMeta.consumerGroupId, 
updatedMeta.consumerGroupId)) {
+      return Collections.emptySet();
+    }
+    if (!Objects.equals(currentMeta.creationTime, updatedMeta.creationTime)) {
+      return Collections.emptySet();
+    }
+
+    // no need to check consumerIdToConsumerMeta here to avoid potential 
inconsistent meta
+
+    final Set<String> unsubscribedTopicNames = new HashSet<>();
+    currentMeta
+        .topicNameToSubscribedConsumerIdSet
+        .keySet()
+        .forEach(
+            (topicName) -> {
+              if 
(!updatedMeta.topicNameToSubscribedConsumerIdSet.containsKey(topicName)) {
+                unsubscribedTopicNames.add(topicName);
+              }
+            });
+    return unsubscribedTopicNames;
+  }
+
   /////////////////////////////// consumer ///////////////////////////////
 
-  public void addConsumer(ConsumerMeta consumerMeta) {
+  public void addConsumer(final ConsumerMeta consumerMeta) {
     consumerIdToConsumerMeta.put(consumerMeta.getConsumerId(), consumerMeta);
   }
 
-  public void removeConsumer(String consumerId) {
+  public void removeConsumer(final String consumerId) {
     consumerIdToConsumerMeta.remove(consumerId);
-    for (Set<String> subscribedConsumers : 
topicNameToSubscribedConsumerIdSet.values()) {
-      subscribedConsumers.remove(consumerId);
+    for (final Map.Entry<String, Set<String>> entry :
+        topicNameToSubscribedConsumerIdSet.entrySet()) {
+      entry.getValue().remove(consumerId);
+      if (entry.getValue().isEmpty()) {
+        topicNameToSubscribedConsumerIdSet.remove(entry.getKey());
+      }
     }
   }
 
-  public boolean containsConsumer(String consumerId) {
+  public boolean containsConsumer(final String consumerId) {
     return consumerIdToConsumerMeta.containsKey(consumerId);
   }
 
@@ -106,13 +134,13 @@ public class ConsumerGroupMeta {
    * @return The set of consumer IDs subscribing the given topic in this 
group. If no consumer is
    *     subscribing the topic, return an empty set.
    */
-  public Set<String> getConsumersSubscribingTopic(String topic) {
+  public Set<String> getConsumersSubscribingTopic(final String topic) {
     return topicNameToSubscribedConsumerIdSet.getOrDefault(topic, 
Collections.emptySet());
   }
 
-  public Set<String> getTopicsSubscribedByConsumer(String consumerId) {
-    Set<String> topics = new HashSet<>();
-    for (Map.Entry<String, Set<String>> topicNameToSubscribedConsumerId :
+  public Set<String> getTopicsSubscribedByConsumer(final String consumerId) {
+    final Set<String> topics = new HashSet<>();
+    for (final Map.Entry<String, Set<String>> topicNameToSubscribedConsumerId :
         topicNameToSubscribedConsumerIdSet.entrySet()) {
       if (topicNameToSubscribedConsumerId.getValue().contains(consumerId)) {
         topics.add(topicNameToSubscribedConsumerId.getKey());
@@ -121,7 +149,7 @@ public class ConsumerGroupMeta {
     return topics;
   }
 
-  public void addSubscription(String consumerId, Set<String> topics) {
+  public void addSubscription(final String consumerId, final Set<String> 
topics) {
     if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
       throw new SubscriptionException(
           String.format(
@@ -129,7 +157,7 @@ public class ConsumerGroupMeta {
               consumerId, consumerGroupId));
     }
 
-    for (String topic : topics) {
+    for (final String topic : topics) {
       topicNameToSubscribedConsumerIdSet
           .computeIfAbsent(topic, k -> new HashSet<>())
           .add(consumerId);
@@ -139,7 +167,7 @@ public class ConsumerGroupMeta {
   /**
    * @return topics subscribed by no consumers in this group after this 
removal.
    */
-  public Set<String> removeSubscription(String consumerId, Set<String> topics) 
{
+  public Set<String> removeSubscription(final String consumerId, final 
Set<String> topics) {
     if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
       throw new SubscriptionException(
           String.format(
@@ -147,8 +175,8 @@ public class ConsumerGroupMeta {
               consumerId, consumerGroupId));
     }
 
-    Set<String> noSubscriptionTopicAfterRemoval = new HashSet<>();
-    for (String topic : topics) {
+    final Set<String> noSubscriptionTopicAfterRemoval = new HashSet<>();
+    for (final String topic : topics) {
       if (topicNameToSubscribedConsumerIdSet.containsKey(topic)) {
         topicNameToSubscribedConsumerIdSet.get(topic).remove(consumerId);
         if (topicNameToSubscribedConsumerIdSet.get(topic).isEmpty()) {
@@ -163,33 +191,34 @@ public class ConsumerGroupMeta {
   /////////////////////////////// de/ser ///////////////////////////////
 
   public ByteBuffer serialize() throws IOException {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS();
-    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
+    final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+    final DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
     serialize(outputStream);
     return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
   }
 
-  public void serialize(OutputStream outputStream) throws IOException {
+  public void serialize(final OutputStream outputStream) throws IOException {
     ReadWriteIOUtils.write(consumerGroupId, outputStream);
     ReadWriteIOUtils.write(creationTime, outputStream);
 
     ReadWriteIOUtils.write(topicNameToSubscribedConsumerIdSet.size(), 
outputStream);
-    for (Map.Entry<String, Set<String>> entry : 
topicNameToSubscribedConsumerIdSet.entrySet()) {
+    for (final Map.Entry<String, Set<String>> entry :
+        topicNameToSubscribedConsumerIdSet.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       ReadWriteIOUtils.write(entry.getValue().size(), outputStream);
-      for (String id : entry.getValue()) {
+      for (final String id : entry.getValue()) {
         ReadWriteIOUtils.write(id, outputStream);
       }
     }
 
     ReadWriteIOUtils.write(consumerIdToConsumerMeta.size(), outputStream);
-    for (Map.Entry<String, ConsumerMeta> entry : 
consumerIdToConsumerMeta.entrySet()) {
+    for (final Map.Entry<String, ConsumerMeta> entry : 
consumerIdToConsumerMeta.entrySet()) {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       entry.getValue().serialize(outputStream);
     }
   }
 
-  public static ConsumerGroupMeta deserialize(InputStream inputStream) throws 
IOException {
+  public static ConsumerGroupMeta deserialize(final InputStream inputStream) 
throws IOException {
     final ConsumerGroupMeta consumerGroupMeta = new ConsumerGroupMeta();
 
     consumerGroupMeta.consumerGroupId = 
ReadWriteIOUtils.readString(inputStream);
@@ -220,7 +249,7 @@ public class ConsumerGroupMeta {
     return consumerGroupMeta;
   }
 
-  public static ConsumerGroupMeta deserialize(ByteBuffer byteBuffer) {
+  public static ConsumerGroupMeta deserialize(final ByteBuffer byteBuffer) {
     final ConsumerGroupMeta consumerGroupMeta = new ConsumerGroupMeta();
 
     consumerGroupMeta.consumerGroupId = 
ReadWriteIOUtils.readString(byteBuffer);
@@ -254,14 +283,14 @@ public class ConsumerGroupMeta {
   /////////////////////////////// Object ///////////////////////////////
 
   @Override
-  public boolean equals(Object obj) {
+  public boolean equals(final Object obj) {
     if (this == obj) {
       return true;
     }
     if (obj == null || getClass() != obj.getClass()) {
       return false;
     }
-    ConsumerGroupMeta that = (ConsumerGroupMeta) obj;
+    final ConsumerGroupMeta that = (ConsumerGroupMeta) obj;
     return Objects.equals(consumerGroupId, that.consumerGroupId)
         && creationTime == that.creationTime
         && Objects.equals(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
index a0e80db04e0..70666a07182 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/topic/TopicMeta.java
@@ -185,6 +185,8 @@ public class TopicMeta {
     
extractorAttributes.putAll(config.getAttributesWithTimeRange(creationTime));
     // realtime mode
     extractorAttributes.putAll(config.getAttributesWithRealtimeMode());
+    // source mode
+    extractorAttributes.putAll(config.getAttributesWithSourceMode());
     return extractorAttributes;
   }
 

Reply via email to