This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new fdf847cc19f Subscription: support drop subscription from session & 
intro allTopicMessagesHaveBeenConsumed for pull consumer (#15486) (#15568)
fdf847cc19f is described below

commit fdf847cc19f21fa1d0a608e76a570ce6ebc65eb7
Author: VGalaxies <vgalax...@apache.org>
AuthorDate: Fri May 23 14:20:10 2025 +0800

    Subscription: support drop subscription from session & intro 
allTopicMessagesHaveBeenConsumed for pull consumer (#15486) (#15568)
    
    - Introducing a new DropSubscriptionTask and corresponding updates across 
executor interfaces, cluster executors, and managers.
    - Updating the SQL parser to recognize the DROP SUBSCRIPTION statement.
    - Enhancing client session APIs and internal data models to support 
subscription deletion.
---
 .../apache/iotdb/SubscriptionSessionExample.java   |  4 +-
 .../it/dual/IoTDBSubscriptionTopicIT.java          |  2 +-
 .../it/local/IoTDBSubscriptionBasicIT.java         | 97 ++++++++++++++++++++++
 .../session/subscription/SubscriptionSession.java  | 19 ++++-
 .../consumer/SubscriptionConsumer.java             |  5 ++
 .../session/subscription/model/Subscription.java   | 16 +++-
 .../org/apache/iotdb/db/qp/sql/IdentifierParser.g4 |  1 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   | 12 +--
 .../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4  |  4 +
 .../subscription/SubscriptionTableResp.java        | 34 ++++----
 .../iotdb/confignode/manager/ConfigManager.java    |  9 ++
 .../apache/iotdb/confignode/manager/IManager.java  |  3 +
 .../subscription/SubscriptionCoordinator.java      | 27 ++++++
 .../persistence/subscription/SubscriptionInfo.java | 56 ++++++++++---
 .../thrift/ConfigNodeRPCServiceProcessor.java      |  6 ++
 .../org/apache/iotdb/db/audit/AuditLogger.java     |  1 +
 .../iotdb/db/protocol/client/ConfigNodeClient.java |  7 ++
 .../common/header/ColumnHeaderConstant.java        |  2 +
 .../plan/execution/config/ConfigTaskVisitor.java   | 22 +++--
 .../config/executor/ClusterConfigTaskExecutor.java | 27 +++++-
 .../config/executor/IConfigTaskExecutor.java       |  4 +
 .../sys/subscription/DropSubscriptionTask.java     | 42 ++++++++++
 ...riptionTask.java => ShowSubscriptionsTask.java} | 17 +++-
 .../db/queryengine/plan/parser/ASTVisitor.java     | 17 ++++
 .../queryengine/plan/statement/StatementType.java  |  3 +-
 .../plan/statement/StatementVisitor.java           |  5 ++
 .../subscription/DropSubscriptionStatement.java    | 86 +++++++++++++++++++
 iotdb-core/node-commons/pom.xml                    |  4 +
 .../meta/consumer/ConsumerGroupMeta.java           | 95 ++++++++++++++++++---
 .../meta/consumer/ConsumerGroupMetaKeeper.java     |  7 ++
 .../meta/subscription/SubscriptionMeta.java        | 84 ++++---------------
 .../src/main/thrift/confignode.thrift              | 11 ++-
 32 files changed, 601 insertions(+), 128 deletions(-)

diff --git 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
index 1ec74a34a29..c01797982e0 100644
--- 
a/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
+++ 
b/example/session/src/main/java/org/apache/iotdb/SubscriptionSessionExample.java
@@ -268,7 +268,7 @@ public class SubscriptionSessionExample {
                         .buildPushConsumer()) {
                   consumer3.open();
                   consumer3.subscribe(TOPIC_3);
-                  while 
(!consumer3.allSnapshotTopicMessagesHaveBeenConsumed()) {
+                  while (!consumer3.allTopicMessagesHaveBeenConsumed()) {
                     LockSupport.parkNanos(SLEEP_NS); // wait some time
                   }
                 }
@@ -309,7 +309,7 @@ public class SubscriptionSessionExample {
                         .buildPullConsumer()) {
                   consumer4.open();
                   consumer4.subscribe(TOPIC_4);
-                  while 
(!consumer4.allSnapshotTopicMessagesHaveBeenConsumed()) {
+                  while (!consumer4.allTopicMessagesHaveBeenConsumed()) {
                     for (final SubscriptionMessage message : 
consumer4.poll(POLL_TIMEOUT_MS)) {
                       final SubscriptionTsFileHandler handler = 
message.getTsFileHandler();
                       handler.moveFile(
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index 723090d433e..b046f09a6da 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -859,7 +859,7 @@ public class IoTDBSubscriptionTopicIT extends 
AbstractSubscriptionDualIT {
                 consumer.open();
                 consumer.subscribe(topicName);
 
-                while (!consumer.allSnapshotTopicMessagesHaveBeenConsumed()) {
+                while (!consumer.allTopicMessagesHaveBeenConsumed()) {
                   LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
                   consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); 
// poll and ignore
                 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index 014b38ec25b..a6750b0ff19 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -19,10 +19,14 @@
 
 package org.apache.iotdb.subscription.it.local;
 
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
 import org.apache.iotdb.isession.ISession;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.subscription.config.TopicConstant;
 import org.apache.iotdb.session.subscription.SubscriptionSession;
 import org.apache.iotdb.session.subscription.consumer.AckStrategy;
@@ -30,6 +34,7 @@ import 
org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
 import org.apache.iotdb.session.subscription.consumer.ConsumeResult;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
 import org.apache.iotdb.session.subscription.consumer.SubscriptionPushConsumer;
+import org.apache.iotdb.session.subscription.model.Subscription;
 import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
 import 
org.apache.iotdb.session.subscription.payload.SubscriptionSessionDataSet;
 import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
@@ -51,6 +56,7 @@ import java.time.Duration;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -621,4 +627,95 @@ public class IoTDBSubscriptionBasicIT extends 
AbstractSubscriptionLocalIT {
       fail(e.getMessage());
     }
   }
+
+  @Test
+  public void testDropSubscriptionBySession() throws Exception {
+    // Insert some historical data
+    try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+      for (int i = 0; i < 100; ++i) {
+        session.executeNonQueryStatement(
+            String.format("insert into root.db.d1(time, s1) values (%s, 1)", 
i));
+      }
+      session.executeNonQueryStatement("flush");
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Create topic
+    final String topicName = "topic8";
+    final String host = EnvFactory.getEnv().getIP();
+    final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      session.createTopic(topicName);
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    // Subscription
+    final Thread thread =
+        new Thread(
+            () -> {
+              try (final SubscriptionPullConsumer consumer =
+                  new SubscriptionPullConsumer.Builder()
+                      .host(host)
+                      .port(port)
+                      .consumerId("c1")
+                      .consumerGroupId("cg1")
+                      .autoCommit(true)
+                      .buildPullConsumer()) {
+                consumer.open();
+                consumer.subscribe(topicName);
+
+                while (!consumer.allTopicMessagesHaveBeenConsumed()) {
+                  LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS); 
// wait some time
+                  consumer.poll(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS); 
// poll and ignore
+                }
+              } catch (final Exception e) {
+                e.printStackTrace();
+                // Avoid failure
+              } finally {
+                LOGGER.info("consumer exiting...");
+              }
+            },
+            String.format("%s - consumer", testName.getDisplayName()));
+    thread.start();
+
+    // Drop Subscription
+    LockSupport.parkNanos(5_000_000_000L); // wait some time
+    try (final SubscriptionSession session = new SubscriptionSession(host, 
port)) {
+      session.open();
+      final Set<Subscription> subscriptions = 
session.getSubscriptions(topicName);
+      Assert.assertEquals(1, subscriptions.size());
+      
session.dropSubscription(subscriptions.iterator().next().getSubscriptionId());
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+
+    try {
+      // Keep retrying if there are execution failures
+      AWAIT.untilAsserted(
+          () -> {
+            // Check empty subscription
+            try (final SyncConfigNodeIServiceClient client =
+                (SyncConfigNodeIServiceClient)
+                    EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+              final TShowSubscriptionResp showSubscriptionResp =
+                  client.showSubscription(new TShowSubscriptionReq());
+              Assert.assertEquals(
+                  RpcUtils.SUCCESS_STATUS.getCode(), 
showSubscriptionResp.status.getCode());
+              Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+              Assert.assertEquals(0, 
showSubscriptionResp.subscriptionInfoList.size());
+            }
+          });
+    } catch (final Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      thread.join();
+    }
+  }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 68e2ac0028c..2f2bc40d951 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -266,6 +266,20 @@ public class SubscriptionSession extends Session {
     }
   }
 
+  public void dropSubscription(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the 
parse result
+    final String sql = String.format("DROP SUBSCRIPTION %s", subscriptionId);
+    executeNonQueryStatement(sql);
+  }
+
+  public void dropSubscriptionIfExists(final String subscriptionId)
+      throws IoTDBConnectionException, StatementExecutionException {
+    IdentifierUtils.checkAndParseIdentifier(subscriptionId); // ignore the 
parse result
+    final String sql = String.format("DROP SUBSCRIPTION IF EXISTS %s", 
subscriptionId);
+    executeNonQueryStatement(sql);
+  }
+
   /////////////////////////////// utility ///////////////////////////////
 
   public Set<Topic> convertDataSetToTopics(final SessionDataSet dataSet)
@@ -291,7 +305,7 @@ public class SubscriptionSession extends Session {
     while (dataSet.hasNext()) {
       final RowRecord record = dataSet.next();
       final List<Field> fields = record.getFields();
-      if (fields.size() != 3) {
+      if (fields.size() != 4) {
         throw new SubscriptionException(
             String.format(
                 "Unexpected fields %s was obtained during SHOW 
SUBSCRIPTION...",
@@ -301,7 +315,8 @@ public class SubscriptionSession extends Session {
           new Subscription(
               fields.get(0).getStringValue(),
               fields.get(1).getStringValue(),
-              fields.get(2).getStringValue()));
+              fields.get(2).getStringValue(),
+              fields.get(3).getStringValue()));
     }
     return subscriptions;
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 8cbe1b23cb3..7add29b4d7c 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -123,10 +123,15 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
   @SuppressWarnings("java:S3077")
   protected volatile Map<String, TopicConfig> subscribedTopics = new 
HashMap<>();
 
+  @Deprecated
   public boolean allSnapshotTopicMessagesHaveBeenConsumed() {
     return allTopicMessagesHaveBeenConsumed(subscribedTopics.keySet());
   }
 
+  public boolean allTopicMessagesHaveBeenConsumed() {
+    return allTopicMessagesHaveBeenConsumed(subscribedTopics.keySet());
+  }
+
   private boolean allTopicMessagesHaveBeenConsumed(final Collection<String> 
topicNames) {
     // For the topic that needs to be detected, there are two scenarios to 
consider:
     //   1. If configs as live, it cannot be determined whether the topic has 
been fully consumed.
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
index e5f227be17b..01e454cae2c 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
@@ -21,16 +21,26 @@ package org.apache.iotdb.session.subscription.model;
 
 public class Subscription {
 
+  private final String subscriptionId;
   private final String topicName;
   private final String consumerGroupId;
   private final String consumerIds;
 
-  public Subscription(String topicName, String consumerGroupId, String 
consumerIds) {
+  public Subscription(
+      final String subscriptionId,
+      final String topicName,
+      final String consumerGroupId,
+      final String consumerIds) {
+    this.subscriptionId = subscriptionId;
     this.topicName = topicName;
     this.consumerGroupId = consumerGroupId;
     this.consumerIds = consumerIds;
   }
 
+  public String getSubscriptionId() {
+    return subscriptionId;
+  }
+
   public String getTopicName() {
     return topicName;
   }
@@ -45,7 +55,9 @@ public class Subscription {
 
   @Override
   public String toString() {
-    return "Subscription{topicName="
+    return "Subscription{subscriptionId="
+        + subscriptionId
+        + ", topicName="
         + topicName
         + ", consumerGroupId="
         + consumerGroupId
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
index 04f603857a4..632911fb8af 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IdentifierParser.g4
@@ -221,6 +221,7 @@ keyWords
     | STATELESS
     | STATEMENT
     | STOP
+    | SUBSCRIPTION
     | SUBSCRIPTIONS
     | SUBSTRING
     | SYSTEM
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index dc88779f76e..02b5062cb02 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -56,10 +56,8 @@ ddlStatement
     | createPipe | alterPipe | dropPipe | startPipe | stopPipe | showPipes
     // Pipe Plugin
     | createPipePlugin | dropPipePlugin | showPipePlugins
-    // TOPIC
-    | createTopic | dropTopic | showTopics
     // Subscription
-    | showSubscriptions
+    | createTopic | dropTopic | showTopics | showSubscriptions | 
dropSubscription
     // CQ
     | createContinuousQuery | dropContinuousQuery | showContinuousQueries
     // Cluster
@@ -667,7 +665,8 @@ showPipePlugins
     : SHOW PIPEPLUGINS
     ;
 
-// Topic 
=========================================================================================
+
+// Subscription 
=========================================================================================
 createTopic
     : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause?
     ;
@@ -688,11 +687,14 @@ showTopics
     : SHOW ((TOPIC topicName=identifier) | TOPICS )
     ;
 
-// Subscriptions 
=========================================================================================
 showSubscriptions
     : SHOW SUBSCRIPTIONS (ON topicName=identifier)?
     ;
 
+dropSubscription
+    : DROP SUBSCRIPTION (IF EXISTS)? subscriptionId=identifier
+    ;
+
 // AI Model 
=========================================================================================
 // ---- Create Model
 createModel
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 10d0da69de4..f90a0d3dbfd 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -790,6 +790,10 @@ STOP
     : S T O P
     ;
 
+SUBSCRIPTION
+    : S U B S C R I P T I O N
+    ;
+
 SUBSCRIPTIONS
     : S U B S C R I P T I O N S
     ;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
index d6679ea4c11..e3c336cb5b8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/response/subscription/SubscriptionTableResp.java
@@ -31,6 +31,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
 
 public class SubscriptionTableResp implements DataSet {
   private final TSStatus status;
@@ -47,29 +50,30 @@ public class SubscriptionTableResp implements DataSet {
   }
 
   public SubscriptionTableResp filter(String topicName) {
-    if (topicName == null) {
-      return this;
-    } else {
-      final List<SubscriptionMeta> filteredSubscriptionMeta = new 
ArrayList<>();
-      for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
-        if (subscriptionMeta.getTopicName().equals(topicName)) {
-          filteredSubscriptionMeta.add(subscriptionMeta);
-          break;
-        }
-      }
-      return new SubscriptionTableResp(status, filteredSubscriptionMeta, 
allConsumerGroupMeta);
-    }
+    return new SubscriptionTableResp(
+        status,
+        allSubscriptionMeta.stream()
+            .filter(
+                subscriptionMeta ->
+                    (Objects.isNull(topicName)
+                        || Objects.equals(
+                            subscriptionMeta.getTopicMeta().getTopicName(), 
topicName)))
+            .collect(Collectors.toList()),
+        allConsumerGroupMeta);
   }
 
   public TShowSubscriptionResp convertToTShowSubscriptionResp() {
     final List<TShowSubscriptionInfo> showSubscriptionInfoList = new 
ArrayList<>();
 
     for (SubscriptionMeta subscriptionMeta : allSubscriptionMeta) {
-      showSubscriptionInfoList.add(
+      TShowSubscriptionInfo showSubscriptionInfo =
           new TShowSubscriptionInfo(
-              subscriptionMeta.getTopicName(),
+              subscriptionMeta.getTopicMeta().getTopicName(),
               subscriptionMeta.getConsumerGroupId(),
-              subscriptionMeta.getConsumerIds()));
+              subscriptionMeta.getConsumerIds());
+      Optional<Long> creationTime = subscriptionMeta.getCreationTime();
+      creationTime.ifPresent(showSubscriptionInfo::setCreationTime);
+      showSubscriptionInfoList.add(showSubscriptionInfo);
     }
     return new 
TShowSubscriptionResp(status).setSubscriptionInfoList(showSubscriptionInfoList);
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 2e831747c9e..0a207ab7238 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -161,6 +161,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -2241,6 +2242,14 @@ public class ConfigManager implements IManager {
         : status;
   }
 
+  @Override
+  public TSStatus dropSubscriptionById(TDropSubscriptionReq req) {
+    TSStatus status = confirmLeader();
+    return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+        ? 
subscriptionManager.getSubscriptionCoordinator().dropSubscription(req)
+        : status;
+  }
+
   @Override
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) {
     TSStatus status = confirmLeader();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 6fd0a8b567c..657f068e674 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -752,6 +753,8 @@ public interface IManager {
 
   TSStatus dropSubscription(TUnsubscribeReq req);
 
+  TSStatus dropSubscriptionById(TDropSubscriptionReq req);
+
   TShowSubscriptionResp showSubscription(TShowSubscriptionReq req);
 
   TGetAllSubscriptionInfoResp getAllSubscriptionInfo();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index 20dfce44bf2..28c596de7ba 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
 import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp;
@@ -42,10 +43,12 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collections;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class SubscriptionCoordinator {
@@ -248,6 +251,30 @@ public class SubscriptionCoordinator {
     return status;
   }
 
+  public TSStatus dropSubscription(TDropSubscriptionReq req) {
+    final String subscriptionId = req.getSubsciptionId();
+    final boolean isSetIfExistsCondition =
+        req.isSetIfExistsCondition() && req.isIfExistsCondition();
+    final Optional<Pair<String, String>> topicNameWithConsumerGroupName =
+        subscriptionInfo.parseSubscriptionId(subscriptionId);
+    if (!topicNameWithConsumerGroupName.isPresent()) {
+      return isSetIfExistsCondition
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : RpcUtils.getStatus(
+              TSStatusCode.TOPIC_NOT_EXIST_ERROR,
+              String.format(
+                  "Failed to drop subscription %s. Failures: %s does not 
exist.",
+                  subscriptionId, subscriptionId));
+    }
+    return configManager
+        .getProcedureManager()
+        .dropSubscription(
+            new TUnsubscribeReq()
+                .setConsumerId(null)
+                .setConsumerGroupId(topicNameWithConsumerGroupName.get().right)
+                
.setTopicNames(Collections.singleton(topicNameWithConsumerGroupName.get().left)));
+  }
+
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) {
     try {
       return ((SubscriptionTableResp)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index 2ccde563866..9a1c6acc72a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -44,6 +44,8 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 
+import org.apache.thrift.annotation.Nullable;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,9 +56,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
@@ -603,15 +607,18 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
   private void checkBeforeUnsubscribeInternal(TUnsubscribeReq unsubscribeReq)
       throws SubscriptionException {
     // 1. Check if the consumer exists
-    if (!isConsumerExisted(unsubscribeReq.getConsumerGroupId(), 
unsubscribeReq.getConsumerId())) {
-      // There is no consumer with the same consumerId and consumerGroupId,
-      // we should end the procedure
-      final String exceptionMessage =
-          String.format(
-              "Failed to unsubscribe because the consumer %s in consumer group 
%s does not exist",
-              unsubscribeReq.getConsumerId(), 
unsubscribeReq.getConsumerGroupId());
-      LOGGER.warn(exceptionMessage);
-      throw new SubscriptionException(exceptionMessage);
+    // NOTE: consumer id may be null if drop subscription by session
+    if (Objects.nonNull(unsubscribeReq.getConsumerId())) {
+      if (!isConsumerExisted(unsubscribeReq.getConsumerGroupId(), 
unsubscribeReq.getConsumerId())) {
+        // There is no consumer with the same consumerId and consumerGroupId,
+        // we should end the procedure
+        final String exceptionMessage =
+            String.format(
+                "Failed to unsubscribe because the consumer %s in consumer 
group %s does not exist",
+                unsubscribeReq.getConsumerId(), 
unsubscribeReq.getConsumerGroupId());
+        LOGGER.warn(exceptionMessage);
+        throw new SubscriptionException(exceptionMessage);
+      }
     }
 
     // 2. Check if all topics exist. No need to check if already subscribed.
@@ -638,17 +645,28 @@ public class SubscriptionInfo implements 
SnapshotProcessor {
   }
 
   private List<SubscriptionMeta> getAllSubscriptionMeta() {
+    return getAllSubscriptionMetaInternal(null);
+  }
+
+  private List<SubscriptionMeta> getAllSubscriptionMetaInternal(
+      @Nullable Predicate<TopicMeta> predicate) {
     List<SubscriptionMeta> allSubscriptions = new ArrayList<>();
     for (TopicMeta topicMeta : topicMetaKeeper.getAllTopicMeta()) {
+      if (Objects.nonNull(predicate) && !predicate.test(topicMeta)) {
+        continue;
+      }
       for (String consumerGroupId :
           
consumerGroupMetaKeeper.getSubscribedConsumerGroupIds(topicMeta.getTopicName()))
 {
         Set<String> subscribedConsumerIDs =
             consumerGroupMetaKeeper.getConsumersSubscribingTopic(
                 consumerGroupId, topicMeta.getTopicName());
+        Optional<Long> creationTime =
+            consumerGroupMetaKeeper.getSubscriptionCreationTime(
+                consumerGroupId, topicMeta.getTopicName());
         if (!subscribedConsumerIDs.isEmpty()) {
           allSubscriptions.add(
               new SubscriptionMeta(
-                  topicMeta.getTopicName(), consumerGroupId, 
subscribedConsumerIDs));
+                  topicMeta, consumerGroupId, subscribedConsumerIDs, 
creationTime.orElse(null)));
         }
       }
     }
@@ -661,6 +679,24 @@ public class SubscriptionInfo implements SnapshotProcessor 
{
         .collect(Collectors.toList());
   }
 
+  public Optional<Pair<String, String>> parseSubscriptionId(String 
subscriptionId) {
+    acquireReadLock();
+    try {
+      List<SubscriptionMeta> allSubscriptions = 
getAllSubscriptionMetaInternal(null);
+      for (SubscriptionMeta subscriptionMeta : allSubscriptions) {
+        if (Objects.equals(subscriptionId, 
subscriptionMeta.getSubscriptionId())) {
+          return Optional.of(
+              new Pair<>(
+                  subscriptionMeta.getTopicMeta().getTopicName(),
+                  subscriptionMeta.getConsumerGroupId()));
+        }
+      }
+      return Optional.empty();
+    } finally {
+      releaseReadLock();
+    }
+  }
+
   /////////////////////////////////  Snapshot  
/////////////////////////////////
 
   @Override
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 0c2d0633c76..0daf96f0d25 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -130,6 +130,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -1182,6 +1183,11 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return configManager.dropSubscription(req);
   }
 
+  @Override
+  public TSStatus dropSubscriptionById(TDropSubscriptionReq req) {
+    return configManager.dropSubscriptionById(req);
+  }
+
   @Override
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) {
     return configManager.showSubscription(req);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
index bb5f557ec0e..ed3efee9a1b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/audit/AuditLogger.java
@@ -210,6 +210,7 @@ public class AuditLogger {
       case RENAME_LOGICAL_VIEW:
       case CREATE_TOPIC:
       case DROP_TOPIC:
+      case DROP_SUBSCRIPTION:
         return AuditLogOperation.DDL;
       case LOAD_DATA:
       case INSERT:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index f809c92be5c..979c18320ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -92,6 +92,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -1113,6 +1114,12 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.dropSubscription(req), status -> 
!updateConfigNodeLeader(status));
   }
 
+  @Override
+  public TSStatus dropSubscriptionById(TDropSubscriptionReq req) throws 
TException {
+    return executeRemoteCallWithRetry(
+        () -> client.dropSubscriptionById(req), status -> 
!updateConfigNodeLeader(status));
+  }
+
   @Override
   public TShowSubscriptionResp showSubscription(TShowSubscriptionReq req) 
throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
index 18c3908f13d..0f26c42e39d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/header/ColumnHeaderConstant.java
@@ -127,6 +127,7 @@ public class ColumnHeaderConstant {
   // column names for show subscriptions statement
   public static final String CONSUMER_GROUP_NAME = "ConsumerGroupName";
   public static final String SUBSCRIBED_CONSUMERS = "SubscribedConsumers";
+  public static final String SUBSCRIPTION_ID = "SubscriptionID";
 
   // show cluster status
   public static final String NODE_TYPE_CONFIG_NODE = "ConfigNode";
@@ -455,6 +456,7 @@ public class ColumnHeaderConstant {
 
   public static final List<ColumnHeader> showSubscriptionColumnHeaders =
       ImmutableList.of(
+          new ColumnHeader(SUBSCRIPTION_ID, TSDataType.TEXT),
           new ColumnHeader(TOPIC_NAME, TSDataType.TEXT),
           new ColumnHeader(CONSUMER_GROUP_NAME, TSDataType.TEXT),
           new ColumnHeader(SUBSCRIBED_CONSUMERS, TSDataType.TEXT));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
index 21caaa6e25f..7a0556007ca 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigTaskVisitor.java
@@ -96,8 +96,9 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.SetThrott
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.CreateTopicTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropSubscriptionTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.DropTopicTask;
-import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
@@ -149,6 +150,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -516,12 +518,6 @@ public class ConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQueryCon
     return new StopPipeTask(stopPipeStatement);
   }
 
-  @Override
-  public IConfigTask visitShowSubscriptions(
-      ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext 
context) {
-    return new ShowSubscriptionTask(showSubscriptionsStatement);
-  }
-
   public IConfigTask visitCreateTopic(
       CreateTopicStatement createTopicStatement, MPPQueryContext context) {
     return new CreateTopicTask(createTopicStatement);
@@ -539,6 +535,18 @@ public class ConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQueryCon
     return new ShowTopicsTask(showTopicsStatement);
   }
 
+  @Override
+  public IConfigTask visitShowSubscriptions(
+      ShowSubscriptionsStatement showSubscriptionsStatement, MPPQueryContext 
context) {
+    return new ShowSubscriptionsTask(showSubscriptionsStatement);
+  }
+
+  @Override
+  public IConfigTask visitDropSubscription(
+      DropSubscriptionStatement dropSubscriptionStatement, MPPQueryContext 
context) {
+    return new DropSubscriptionTask(dropSubscriptionStatement);
+  }
+
   @Override
   public IConfigTask visitDeleteTimeSeries(
       DeleteTimeSeriesStatement deleteTimeSeriesStatement, MPPQueryContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index d2b5d0bbe77..abdecf6ba73 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -84,6 +84,7 @@ import 
org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropModelReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropSubscriptionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TExtendRegionReq;
@@ -173,7 +174,7 @@ import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.TestConnectionT
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.pipe.ShowPipeTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowSpaceQuotaTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.quota.ShowThrottleQuotaTask;
-import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowSubscriptionsTask;
 import 
org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription.ShowTopicsTask;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import 
org.apache.iotdb.db.queryengine.plan.expression.visitor.TransformToViewExpressionVisitor;
@@ -212,6 +213,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -2098,7 +2100,7 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
         return future;
       }
 
-      ShowSubscriptionTask.buildTSBlock(
+      ShowSubscriptionsTask.buildTSBlock(
           showSubscriptionResp.isSetSubscriptionInfoList()
               ? showSubscriptionResp.getSubscriptionInfoList()
               : Collections.emptyList(),
@@ -2109,6 +2111,27 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     return future;
   }
 
+  public SettableFuture<ConfigTaskResult> dropSubscription(
+      final DropSubscriptionStatement dropSubscriptionStatement) {
+    final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      final TSStatus tsStatus =
+          configNodeClient.dropSubscriptionById(
+              new TDropSubscriptionReq()
+                  
.setSubsciptionId(dropSubscriptionStatement.getSubscriptionId())
+                  
.setIfExistsCondition(dropSubscriptionStatement.hasIfExistsCondition()));
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
+        future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (Exception e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
   @Override
   public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement 
createTopicStatement) {
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index e05457d9cf8..0b800a949a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -62,6 +62,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -195,6 +196,9 @@ public interface IConfigTaskExecutor {
   SettableFuture<ConfigTaskResult> showSubscriptions(
       ShowSubscriptionsStatement showSubscriptionsStatement);
 
+  SettableFuture<ConfigTaskResult> dropSubscription(
+      DropSubscriptionStatement dropSubscriptionStatement);
+
   SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement 
createTopicStatement);
 
   SettableFuture<ConfigTaskResult> dropTopic(DropTopicStatement 
dropTopicStatement);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java
new file mode 100644
index 00000000000..1c8725d9b43
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/DropSubscriptionTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.sys.subscription;
+
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import 
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class DropSubscriptionTask implements IConfigTask {
+
+  private final DropSubscriptionStatement dropSubscriptionStatement;
+
+  public DropSubscriptionTask(final DropSubscriptionStatement 
dropSubscriptionStatement) {
+    this.dropSubscriptionStatement = dropSubscriptionStatement;
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor 
configTaskExecutor)
+      throws InterruptedException {
+    return configTaskExecutor.dropSubscription(dropSubscriptionStatement);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
similarity index 85%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
index 29fe87699f5..982e8e33e95 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/subscription/ShowSubscriptionsTask.java
@@ -38,11 +38,11 @@ import org.apache.tsfile.utils.Binary;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class ShowSubscriptionTask implements IConfigTask {
+public class ShowSubscriptionsTask implements IConfigTask {
 
   private final ShowSubscriptionsStatement showSubscriptionsStatement;
 
-  public ShowSubscriptionTask(ShowSubscriptionsStatement 
showSubscriptionsStatement) {
+  public ShowSubscriptionsTask(ShowSubscriptionsStatement 
showSubscriptionsStatement) {
     this.showSubscriptionsStatement = showSubscriptionsStatement;
   }
 
@@ -62,15 +62,24 @@ public class ShowSubscriptionTask implements IConfigTask {
 
     for (TShowSubscriptionInfo tSubscriptionInfo : subscriptionInfoList) {
       builder.getTimeColumnBuilder().writeLong(0L);
+      final StringBuilder subscriptionId =
+          new StringBuilder(
+              tSubscriptionInfo.getTopicName() + "_" + 
tSubscriptionInfo.getConsumerGroupId());
+      if (tSubscriptionInfo.getCreationTime() != 0) {
+        subscriptionId.append("_").append(tSubscriptionInfo.getCreationTime());
+      }
       builder
           .getColumnBuilder(0)
-          .writeBinary(new Binary(tSubscriptionInfo.getTopicName(), 
TSFileConfig.STRING_CHARSET));
+          .writeBinary(new Binary(subscriptionId.toString(), 
TSFileConfig.STRING_CHARSET));
       builder
           .getColumnBuilder(1)
+          .writeBinary(new Binary(tSubscriptionInfo.getTopicName(), 
TSFileConfig.STRING_CHARSET));
+      builder
+          .getColumnBuilder(2)
           .writeBinary(
               new Binary(tSubscriptionInfo.getConsumerGroupId(), 
TSFileConfig.STRING_CHARSET));
       builder
-          .getColumnBuilder(2)
+          .getColumnBuilder(3)
           .writeBinary(
               new Binary(
                   tSubscriptionInfo.getConsumerIds().toString(), 
TSFileConfig.STRING_CHARSET));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 4b35f14fcfd..d33eaa953d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -186,6 +186,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -4037,6 +4038,22 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
     return showSubscriptionsStatement;
   }
 
+  @Override
+  public Statement 
visitDropSubscription(IoTDBSqlParser.DropSubscriptionContext ctx) {
+    final DropSubscriptionStatement dropSubscriptionStatement = new 
DropSubscriptionStatement();
+
+    if (ctx.subscriptionId != null) {
+      
dropSubscriptionStatement.setSubscriptionId(parseIdentifier(ctx.subscriptionId.getText()));
+    } else {
+      throw new SemanticException(
+          "Not support for this sql in DROP SUBSCRIPTION, please enter 
subscriptionId.");
+    }
+
+    dropSubscriptionStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != 
null);
+
+    return dropSubscriptionStatement;
+  }
+
   @Override
   public Statement visitGetRegionId(IoTDBSqlParser.GetRegionIdContext ctx) {
     TConsensusGroupType type =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 4f49a5909e7..4d5fe5dbf3f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -182,7 +182,8 @@ public enum StatementType {
   CREATE_TOPIC,
   DROP_TOPIC,
   SHOW_TOPICS,
-
   SHOW_SUBSCRIPTIONS,
+  DROP_SUBSCRIPTION,
+
   SET_CONFIGURATION
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 7af95f40a9a..1792d4eb268 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -93,6 +93,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.MigrateReg
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.ReconstructRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.region.RemoveRegionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.CreateTopicStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropSubscriptionStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.DropTopicStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowSubscriptionsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription.ShowTopicsStatement;
@@ -560,6 +561,10 @@ public abstract class StatementVisitor<R, C> {
     return visitStatement(showSubscriptionsStatement, context);
   }
 
+  public R visitDropSubscription(DropSubscriptionStatement 
dropSubscriptionStatement, C context) {
+    return visitStatement(dropSubscriptionStatement, context);
+  }
+
   public R visitGetRegionId(GetRegionIdStatement getRegionIdStatement, C 
context) {
     return visitStatement(getRegionIdStatement, context);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java
new file mode 100644
index 00000000000..73284fb6de5
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropSubscriptionStatement.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.metadata.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DropSubscriptionStatement extends Statement implements 
IConfigStatement {
+
+  private String subscriptionId;
+  private boolean ifExistsCondition;
+
+  public DropSubscriptionStatement() {
+    super();
+    statementType = StatementType.DROP_SUBSCRIPTION;
+  }
+
+  public String getSubscriptionId() {
+    return subscriptionId;
+  }
+
+  public boolean hasIfExistsCondition() {
+    return ifExistsCondition;
+  }
+
+  public void setSubscriptionId(String subscriptionId) {
+    this.subscriptionId = subscriptionId;
+  }
+
+  public void setIfExists(boolean ifExistsCondition) {
+    this.ifExistsCondition = ifExistsCondition;
+  }
+
+  @Override
+  public QueryType getQueryType() {
+    return QueryType.WRITE;
+  }
+
+  @Override
+  public List<PartialPath> getPaths() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitDropSubscription(this, context);
+  }
+
+  @Override
+  public TSStatus checkPermissionBeforeProcess(String userName) {
+    if (AuthorityChecker.SUPER_USER.equals(userName)) {
+      return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+    }
+    return AuthorityChecker.getTSStatus(
+        AuthorityChecker.checkSystemPermission(userName, 
PrivilegeType.USE_PIPE.ordinal()),
+        PrivilegeType.USE_PIPE);
+  }
+}
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 3a5c64b9ecd..10d3c30d85c 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -167,6 +167,10 @@
             <groupId>com.github.luben</groupId>
             <artifactId>zstd-jni</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.checkerframework</groupId>
+            <artifactId>checker-qual</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index f7d4901884c..f8e486537f6 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -21,8 +21,10 @@ package org.apache.iotdb.commons.subscription.meta.consumer;
 
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 
+import org.apache.thrift.annotation.Nullable;
 import org.apache.tsfile.utils.PublicBAOS;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.checkerframework.checker.nullness.qual.NonNull;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -33,8 +35,10 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 
 public class ConsumerGroupMeta {
 
@@ -42,10 +46,12 @@ public class ConsumerGroupMeta {
   private long creationTime;
   private Map<String, Set<String>> topicNameToSubscribedConsumerIdSet;
   private Map<String, ConsumerMeta> consumerIdToConsumerMeta;
+  private Map<String, Long> topicNameToSubscriptionCreationTime; // used when 
creationTime < 0
 
   public ConsumerGroupMeta() {
     this.topicNameToSubscribedConsumerIdSet = new ConcurrentHashMap<>();
     this.consumerIdToConsumerMeta = new ConcurrentHashMap<>();
+    this.topicNameToSubscriptionCreationTime = new ConcurrentHashMap<>();
   }
 
   public ConsumerGroupMeta(
@@ -53,7 +59,7 @@ public class ConsumerGroupMeta {
     this();
 
     this.consumerGroupId = consumerGroupId;
-    this.creationTime = creationTime;
+    this.creationTime = -creationTime;
 
     consumerIdToConsumerMeta.put(firstConsumerMeta.getConsumerId(), 
firstConsumerMeta);
   }
@@ -65,6 +71,8 @@ public class ConsumerGroupMeta {
     copied.topicNameToSubscribedConsumerIdSet =
         new ConcurrentHashMap<>(topicNameToSubscribedConsumerIdSet);
     copied.consumerIdToConsumerMeta = new 
ConcurrentHashMap<>(consumerIdToConsumerMeta);
+    copied.topicNameToSubscriptionCreationTime =
+        new ConcurrentHashMap<>(topicNameToSubscriptionCreationTime);
     return copied;
   }
 
@@ -73,7 +81,11 @@ public class ConsumerGroupMeta {
   }
 
   public long getCreationTime() {
-    return creationTime;
+    return Math.abs(creationTime);
+  }
+
+  private boolean shouldRecordSubscriptionCreationTime() {
+    return creationTime < 0;
   }
 
   public static /* @NonNull */ Set<String> getTopicsUnsubByGroup(
@@ -140,6 +152,12 @@ public class ConsumerGroupMeta {
     return topicNameToSubscribedConsumerIdSet.getOrDefault(topic, 
Collections.emptySet());
   }
 
+  public Optional<Long> getSubscriptionTime(final String topic) {
+    return shouldRecordSubscriptionCreationTime()
+        ? Optional.ofNullable(topicNameToSubscriptionCreationTime.get(topic))
+        : Optional.empty();
+  }
+
   public Set<String> getTopicsSubscribedByConsumer(final String consumerId) {
     final Set<String> topics = new HashSet<>();
     for (final Map.Entry<String, Set<String>> topicNameToSubscribedConsumerId :
@@ -169,15 +187,35 @@ public class ConsumerGroupMeta {
 
     for (final String topic : topics) {
       topicNameToSubscribedConsumerIdSet
-          .computeIfAbsent(topic, k -> new HashSet<>())
+          .computeIfAbsent(
+              topic,
+              k -> {
+                if (shouldRecordSubscriptionCreationTime()) {
+                  topicNameToSubscriptionCreationTime.put(topic, 
System.currentTimeMillis());
+                }
+                return new HashSet<>();
+              })
           .add(consumerId);
     }
   }
 
   /**
    * @return topics subscribed by no consumers in this group after this 
removal.
+   * @param consumerId if null, remove subscriptions of topics for all 
consumers
    */
-  public Set<String> removeSubscription(final String consumerId, final 
Set<String> topics) {
+  public Set<String> removeSubscription(
+      @Nullable final String consumerId, final Set<String> topics) {
+    if (Objects.isNull(consumerId)) {
+      return consumerIdToConsumerMeta.keySet().stream()
+          .map(id -> removeSubscriptionInternal(id, topics))
+          .flatMap(Set::stream)
+          .collect(Collectors.toSet());
+    }
+    return removeSubscriptionInternal(consumerId, topics);
+  }
+
+  private Set<String> removeSubscriptionInternal(
+      @NonNull final String consumerId, final Set<String> topics) {
     if (!consumerIdToConsumerMeta.containsKey(consumerId)) {
       throw new SubscriptionException(
           String.format(
@@ -190,8 +228,12 @@ public class ConsumerGroupMeta {
       if (topicNameToSubscribedConsumerIdSet.containsKey(topic)) {
         topicNameToSubscribedConsumerIdSet.get(topic).remove(consumerId);
         if (topicNameToSubscribedConsumerIdSet.get(topic).isEmpty()) {
+          // remove subscription for consumer group
           noSubscriptionTopicAfterRemoval.add(topic);
           topicNameToSubscribedConsumerIdSet.remove(topic);
+          if (shouldRecordSubscriptionCreationTime()) {
+            topicNameToSubscriptionCreationTime.remove(topic);
+          }
         }
       }
     }
@@ -226,6 +268,14 @@ public class ConsumerGroupMeta {
       ReadWriteIOUtils.write(entry.getKey(), outputStream);
       entry.getValue().serialize(outputStream);
     }
+
+    if (shouldRecordSubscriptionCreationTime()) {
+      ReadWriteIOUtils.write(topicNameToSubscriptionCreationTime.size(), 
outputStream);
+      for (final Map.Entry<String, Long> entry : 
topicNameToSubscriptionCreationTime.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue(), outputStream);
+      }
+    }
   }
 
   public static ConsumerGroupMeta deserialize(final InputStream inputStream) 
throws IOException {
@@ -256,6 +306,16 @@ public class ConsumerGroupMeta {
       consumerGroupMeta.consumerIdToConsumerMeta.put(key, value);
     }
 
+    consumerGroupMeta.topicNameToSubscriptionCreationTime = new 
ConcurrentHashMap<>();
+    if (consumerGroupMeta.shouldRecordSubscriptionCreationTime()) {
+      size = ReadWriteIOUtils.readInt(inputStream);
+      for (int i = 0; i < size; ++i) {
+        final String key = ReadWriteIOUtils.readString(inputStream);
+        final long value = ReadWriteIOUtils.readLong(inputStream);
+        consumerGroupMeta.topicNameToSubscriptionCreationTime.put(key, value);
+      }
+    }
+
     return consumerGroupMeta;
   }
 
@@ -287,6 +347,16 @@ public class ConsumerGroupMeta {
       consumerGroupMeta.consumerIdToConsumerMeta.put(key, value);
     }
 
+    consumerGroupMeta.topicNameToSubscriptionCreationTime = new 
ConcurrentHashMap<>();
+    if (consumerGroupMeta.shouldRecordSubscriptionCreationTime()) {
+      size = ReadWriteIOUtils.readInt(byteBuffer);
+      for (int i = 0; i < size; ++i) {
+        final String key = ReadWriteIOUtils.readString(byteBuffer);
+        final long value = ReadWriteIOUtils.readLong(byteBuffer);
+        consumerGroupMeta.topicNameToSubscriptionCreationTime.put(key, value);
+      }
+    }
+
     return consumerGroupMeta;
   }
 
@@ -301,11 +371,13 @@ public class ConsumerGroupMeta {
       return false;
     }
     final ConsumerGroupMeta that = (ConsumerGroupMeta) obj;
-    return Objects.equals(consumerGroupId, that.consumerGroupId)
-        && creationTime == that.creationTime
+    return Objects.equals(this.consumerGroupId, that.consumerGroupId)
+        && this.creationTime == that.creationTime
+        && Objects.equals(
+            this.topicNameToSubscribedConsumerIdSet, 
that.topicNameToSubscribedConsumerIdSet)
+        && Objects.equals(this.consumerIdToConsumerMeta, 
that.consumerIdToConsumerMeta)
         && Objects.equals(
-            topicNameToSubscribedConsumerIdSet, 
that.topicNameToSubscribedConsumerIdSet)
-        && Objects.equals(consumerIdToConsumerMeta, 
that.consumerIdToConsumerMeta);
+            this.topicNameToSubscriptionCreationTime, 
that.topicNameToSubscriptionCreationTime);
   }
 
   @Override
@@ -314,7 +386,8 @@ public class ConsumerGroupMeta {
         consumerGroupId,
         creationTime,
         topicNameToSubscribedConsumerIdSet,
-        consumerIdToConsumerMeta);
+        consumerIdToConsumerMeta,
+        topicNameToSubscriptionCreationTime);
   }
 
   @Override
@@ -323,11 +396,13 @@ public class ConsumerGroupMeta {
         + "consumerGroupId='"
         + consumerGroupId
         + "', creationTime="
-        + creationTime
+        + getCreationTime()
         + ", topicNameToSubscribedConsumerIdSet="
         + topicNameToSubscribedConsumerIdSet
         + ", consumerIdToConsumerMeta="
         + consumerIdToConsumerMeta
+        + ", topicNameToSubscriptionCreationTime="
+        + topicNameToSubscriptionCreationTime
         + "}";
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
index c2775b204a9..34dbbbd761d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMetaKeeper.java
@@ -28,6 +28,7 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -91,6 +92,12 @@ public class ConsumerGroupMetaKeeper {
         : Collections.emptySet();
   }
 
+  public Optional<Long> getSubscriptionCreationTime(String consumerGroupId, 
String topic) {
+    return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId)
+        ? 
consumerGroupIdToConsumerGroupMetaMap.get(consumerGroupId).getSubscriptionTime(topic)
+        : Optional.empty();
+  }
+
   public Set<String> getTopicsSubscribedByConsumer(String consumerGroupId, 
String consumerId) {
     return consumerGroupIdToConsumerGroupMetaMap.containsKey(consumerGroupId)
         ? consumerGroupIdToConsumerGroupMetaMap
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
index 91bbb461223..8f644e1aa20 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/subscription/SubscriptionMeta.java
@@ -19,36 +19,34 @@
 
 package org.apache.iotdb.commons.subscription.meta.subscription;
 
-import org.apache.tsfile.utils.PublicBAOS;
-import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 
-import java.io.DataOutputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.HashSet;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 /** SubscriptionMeta is created for show subscription and is not stored in 
meta keeper. */
 public class SubscriptionMeta {
 
-  private String topicName;
+  private TopicMeta topicMeta;
   private String consumerGroupId;
   private Set<String> consumerIds;
+  private Long creationTime;
 
   private SubscriptionMeta() {
     // Empty constructor
   }
 
-  public SubscriptionMeta(String topicName, String consumerGroupId, 
Set<String> consumerIds) {
-    this.topicName = topicName;
+  public SubscriptionMeta(
+      TopicMeta topicMeta, String consumerGroupId, Set<String> consumerIds, 
Long creationTime) {
+    this.topicMeta = topicMeta;
     this.consumerGroupId = consumerGroupId;
     this.consumerIds = consumerIds;
+    this.creationTime = creationTime;
   }
 
-  public String getTopicName() {
-    return topicName;
+  public TopicMeta getTopicMeta() {
+    return topicMeta;
   }
 
   public String getConsumerGroupId() {
@@ -59,60 +57,14 @@ public class SubscriptionMeta {
     return consumerIds;
   }
 
-  public ByteBuffer serialize() throws IOException {
-    PublicBAOS byteArrayOutputStream = new PublicBAOS();
-    DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
-    serialize(outputStream);
-    return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
+  public Optional<Long> getCreationTime() {
+    return Objects.nonNull(creationTime) ? Optional.of(creationTime) : 
Optional.empty();
   }
 
-  public void serialize(DataOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(topicName, outputStream);
-    ReadWriteIOUtils.write(consumerGroupId, outputStream);
-
-    ReadWriteIOUtils.write(consumerIds.size(), outputStream);
-    for (String consumerId : consumerIds) {
-      ReadWriteIOUtils.write(consumerId, outputStream);
-    }
-  }
-
-  public void serialize(FileOutputStream outputStream) throws IOException {
-    ReadWriteIOUtils.write(topicName, outputStream);
-    ReadWriteIOUtils.write(consumerGroupId, outputStream);
-
-    ReadWriteIOUtils.write(consumerIds.size(), outputStream);
-    for (String consumerId : consumerIds) {
-      ReadWriteIOUtils.write(consumerId, outputStream);
-    }
-  }
-
-  public static SubscriptionMeta deserialize(InputStream inputStream) throws 
IOException {
-    final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
-
-    subscriptionMeta.topicName = ReadWriteIOUtils.readString(inputStream);
-    subscriptionMeta.consumerGroupId = 
ReadWriteIOUtils.readString(inputStream);
-    subscriptionMeta.consumerIds = new HashSet<>();
-
-    int size = ReadWriteIOUtils.readInt(inputStream);
-    for (int i = 0; i < size; i++) {
-      
subscriptionMeta.consumerIds.add(ReadWriteIOUtils.readString(inputStream));
-    }
-
-    return subscriptionMeta;
-  }
-
-  public static SubscriptionMeta deserialize(ByteBuffer byteBuffer) {
-    final SubscriptionMeta subscriptionMeta = new SubscriptionMeta();
-
-    subscriptionMeta.topicName = ReadWriteIOUtils.readString(byteBuffer);
-    subscriptionMeta.consumerGroupId = ReadWriteIOUtils.readString(byteBuffer);
-    subscriptionMeta.consumerIds = new HashSet<>();
-
-    int size = ReadWriteIOUtils.readInt(byteBuffer);
-    for (int i = 0; i < size; i++) {
-      
subscriptionMeta.consumerIds.add(ReadWriteIOUtils.readString(byteBuffer));
-    }
-
-    return subscriptionMeta;
+  public String getSubscriptionId() {
+    final StringBuilder subscriptionId =
+        new StringBuilder(topicMeta.getTopicName() + "_" + consumerGroupId);
+    getCreationTime().ifPresent(creationTime -> 
subscriptionId.append("_").append(creationTime));
+    return subscriptionId.toString();
   }
 }
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index f8551590964..4642f3e20af 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -884,6 +884,12 @@ struct TShowSubscriptionInfo {
     1: required string topicName
     2: required string consumerGroupId
     3: required set<string> consumerIds
+    4: optional i64 creationTime
+}
+
+struct TDropSubscriptionReq {
+    1: required string subsciptionId
+    2: optional bool ifExistsCondition
 }
 
 struct TGetAllSubscriptionInfoResp {
@@ -1667,9 +1673,12 @@ service IConfigNodeRPCService {
   /** Create subscription */
   common.TSStatus createSubscription(TSubscribeReq req)
 
-  /** Close subscription */
+  /** Close subscription by consumer */
   common.TSStatus dropSubscription(TUnsubscribeReq req)
 
+  /** Close subscription by session */
+  common.TSStatus dropSubscriptionById(TDropSubscriptionReq req)
+
   /** Show Subscription on topic name, if name is empty, show all 
subscriptions */
   TShowSubscriptionResp showSubscription(TShowSubscriptionReq req)
 

Reply via email to