This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a477562aba8 Subscription: unsubscribe completed topics when remove
consumer config (#15660)
a477562aba8 is described below
commit a477562aba88ca29d1a5aa794c15fe84b1885678
Author: VGalaxies <[email protected]>
AuthorDate: Mon Jun 9 11:10:42 2025 +0800
Subscription: unsubscribe completed topics when remove consumer config
(#15660)
---
.../receiver/SubscriptionReceiverV1.java | 28 ++++++++++++++++++++++
1 file changed, 28 insertions(+)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 1a6ee0a891e..f135dadd6a1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -84,6 +84,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -162,7 +163,12 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
LOGGER.info(
"Subscription: remove consumer config {} when handling exit",
consumerConfigThreadLocal.get());
+ // we should not close the consumer here because it might reuse the
previous consumption
+ // progress to continue consuming
// closeConsumer(consumerConfig);
+ // when handling exit, unsubscribe from topics that have already been
completed as much as
+ // possible to release some resources (such as the underlying pipe) in a
timely manner
+ unsubscribeCompleteTopics(consumerConfig);
consumerConfigThreadLocal.remove();
}
}
@@ -311,6 +317,8 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
// fetch topics should be unsubscribed
final List<String> topicNamesToUnsubscribe =
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig,
topics.keySet());
+ // here we did not immediately unsubscribe from topics in order to allow
the client to perceive
+ // completed topics
return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
RpcUtils.SUCCESS_STATUS, topics, endPoints, topicNamesToUnsubscribe);
@@ -684,6 +692,26 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
LOGGER.info("Subscription: consumer {} close successfully",
consumerConfig);
}
+ private void unsubscribeCompleteTopics(final ConsumerConfig consumerConfig) {
+ // fetch subscribed topics
+ final Map<String, TopicConfig> topics =
+ SubscriptionAgent.topic()
+ .getTopicConfigs(
+ SubscriptionAgent.consumer()
+ .getTopicNamesSubscribedByConsumer(
+ consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId()));
+
+ // fetch topics should be unsubscribed
+ final List<String> topicNamesToUnsubscribe =
+
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig,
topics.keySet());
+
+ unsubscribe(consumerConfig, new HashSet<>(topicNamesToUnsubscribe));
+ LOGGER.info(
+ "Subscription: consumer {} unsubscribe {} (completed topics)
successfully",
+ consumerConfig,
+ topicNamesToUnsubscribe);
+ }
+
//////////////////////////// consumer operations ////////////////////////////
private void createConsumer(final ConsumerConfig consumerConfig) throws
SubscriptionException {