This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a477562aba8 Subscription: unsubscribe completed topics when remove 
consumer config (#15660)
a477562aba8 is described below

commit a477562aba88ca29d1a5aa794c15fe84b1885678
Author: VGalaxies <[email protected]>
AuthorDate: Mon Jun 9 11:10:42 2025 +0800

    Subscription: unsubscribe completed topics when remove consumer config 
(#15660)
---
 .../receiver/SubscriptionReceiverV1.java           | 28 ++++++++++++++++++++++
 1 file changed, 28 insertions(+)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 1a6ee0a891e..f135dadd6a1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -84,6 +84,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -162,7 +163,12 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
       LOGGER.info(
           "Subscription: remove consumer config {} when handling exit",
           consumerConfigThreadLocal.get());
+      // we should not close the consumer here because it might reuse the 
previous consumption
+      // progress to continue consuming
       // closeConsumer(consumerConfig);
+      // when handling exit, unsubscribe from topics that have already been 
completed as much as
+      // possible to release some resources (such as the underlying pipe) in a 
timely manner
+      unsubscribeCompleteTopics(consumerConfig);
       consumerConfigThreadLocal.remove();
     }
   }
@@ -311,6 +317,8 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     // fetch topics should be unsubscribed
     final List<String> topicNamesToUnsubscribe =
         
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, 
topics.keySet());
+    // here we did not immediately unsubscribe from topics in order to allow 
the client to perceive
+    // completed topics
 
     return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
         RpcUtils.SUCCESS_STATUS, topics, endPoints, topicNamesToUnsubscribe);
@@ -684,6 +692,26 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     LOGGER.info("Subscription: consumer {} close successfully", 
consumerConfig);
   }
 
+  private void unsubscribeCompleteTopics(final ConsumerConfig consumerConfig) {
+    // fetch subscribed topics
+    final Map<String, TopicConfig> topics =
+        SubscriptionAgent.topic()
+            .getTopicConfigs(
+                SubscriptionAgent.consumer()
+                    .getTopicNamesSubscribedByConsumer(
+                        consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId()));
+
+    // fetch topics should be unsubscribed
+    final List<String> topicNamesToUnsubscribe =
+        
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, 
topics.keySet());
+
+    unsubscribe(consumerConfig, new HashSet<>(topicNamesToUnsubscribe));
+    LOGGER.info(
+        "Subscription: consumer {} unsubscribe {} (completed topics) 
successfully",
+        consumerConfig,
+        topicNamesToUnsubscribe);
+  }
+
   //////////////////////////// consumer operations ////////////////////////////
 
   private void createConsumer(final ConsumerConfig consumerConfig) throws 
SubscriptionException {

Reply via email to