This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 49bd622513c Subscription: support unsubscribe from completed topics 
under client heartbeat thread (#15595) (#15603)
49bd622513c is described below

commit 49bd622513c6c45187e73e903c7e1b5bc2009a10
Author: VGalaxies <[email protected]>
AuthorDate: Thu May 29 18:26:06 2025 +0800

    Subscription: support unsubscribe from completed topics under client 
heartbeat thread (#15595) (#15603)
---
 .../IoTDBSnapshotDevicePullConsumerDataSetIT.java  |  3 ++
 .../response/PipeSubscribeHeartbeatResp.java       | 61 ++++++++++++++++++----
 .../SubscriptionSessionConnection.java             | 32 ------------
 .../consumer/SubscriptionConsumer.java             |  2 +-
 .../consumer/SubscriptionProvider.java             |  6 +--
 .../consumer/SubscriptionProviders.java            | 17 ++++--
 .../agent/SubscriptionBrokerAgent.java             | 11 ++++
 .../db/subscription/broker/SubscriptionBroker.java | 19 +++++++
 .../receiver/SubscriptionReceiverV1.java           | 51 ++++++++++++++++--
 9 files changed, 150 insertions(+), 52 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
index 10a83a7e674..aca980b88e0 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
@@ -147,5 +147,8 @@ public class IoTDBSnapshotDevicePullConsumerDataSetIT 
extends AbstractSubscripti
     consume_data(consumer, session_dest);
     check_count(8, "select count(s_0) from " + device, "Consume data again:" + 
pattern);
     check_count(8, "select count(s_1) from " + device, "Consumption data: 
s_1");
+    while (!consumer.allTopicMessagesHaveBeenConsumed()) {
+      Thread.sleep(1000);
+    }
   }
 }
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
index 34a927ef29c..7bf02c7c6e7 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.rpc.subscription.payload.response;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
@@ -29,8 +30,10 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -38,10 +41,23 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
 
   private transient Map<String, TopicConfig> topics = new HashMap<>(); // 
subscribed topics
 
+  private transient Map<Integer, TEndPoint> endPoints = new HashMap<>(); // 
available endpoints
+
+  private transient List<String> topicNamesToUnsubscribe =
+      new ArrayList<>(); // topics should be unsubscribed
+
   public Map<String, TopicConfig> getTopics() {
     return topics;
   }
 
+  public Map<Integer, TEndPoint> getEndPoints() {
+    return endPoints;
+  }
+
+  public List<String> getTopicNamesToUnsubscribe() {
+    return topicNamesToUnsubscribe;
+  }
+
   /////////////////////////////// Thrift ///////////////////////////////
 
   /**
@@ -63,7 +79,11 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
    * server.
    */
   public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
-      final TSStatus status, final Map<String, TopicConfig> topics) throws 
IOException {
+      final TSStatus status,
+      final Map<String, TopicConfig> topics,
+      final Map<Integer, TEndPoint> endPoints,
+      final List<String> topicNamesToUnsubscribe)
+      throws IOException {
     final PipeSubscribeHeartbeatResp resp = toTPipeSubscribeResp(status);
 
     try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -73,6 +93,13 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
         ReadWriteIOUtils.write(entry.getKey(), outputStream);
         entry.getValue().serialize(outputStream);
       }
+      ReadWriteIOUtils.write(endPoints.size(), outputStream);
+      for (final Map.Entry<Integer, TEndPoint> entry : endPoints.entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue().getIp(), outputStream);
+        ReadWriteIOUtils.write(entry.getValue().getPort(), outputStream);
+      }
+      ReadWriteIOUtils.writeStringList(topicNamesToUnsubscribe, outputStream);
       resp.body =
           Collections.singletonList(
               ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size()));
@@ -89,14 +116,28 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
     if (Objects.nonNull(heartbeatResp.body)) {
       for (final ByteBuffer byteBuffer : heartbeatResp.body) {
         if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
-          final int size = ReadWriteIOUtils.readInt(byteBuffer);
-          final Map<String, TopicConfig> topics = new HashMap<>();
-          for (int i = 0; i < size; i++) {
-            final String topicName = ReadWriteIOUtils.readString(byteBuffer);
-            final TopicConfig topicConfig = 
TopicConfig.deserialize(byteBuffer);
-            topics.put(topicName, topicConfig);
+          {
+            final int size = ReadWriteIOUtils.readInt(byteBuffer);
+            final Map<String, TopicConfig> topics = new HashMap<>();
+            for (int i = 0; i < size; i++) {
+              final String topicName = ReadWriteIOUtils.readString(byteBuffer);
+              final TopicConfig topicConfig = 
TopicConfig.deserialize(byteBuffer);
+              topics.put(topicName, topicConfig);
+            }
+            resp.topics = topics;
+          }
+          {
+            final int size = ReadWriteIOUtils.readInt(byteBuffer);
+            final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+            for (int i = 0; i < size; i++) {
+              final int nodeId = ReadWriteIOUtils.readInt(byteBuffer);
+              final String ip = ReadWriteIOUtils.readString(byteBuffer);
+              final int port = ReadWriteIOUtils.readInt(byteBuffer);
+              endPoints.put(nodeId, new TEndPoint(ip, port));
+            }
+            resp.endPoints = endPoints;
           }
-          resp.topics = topics;
+          resp.topicNamesToUnsubscribe = 
ReadWriteIOUtils.readStringList(byteBuffer);
           break;
         }
       }
@@ -122,6 +163,8 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
     }
     final PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
     return Objects.equals(this.topics, that.topics)
+        && Objects.equals(this.endPoints, that.endPoints)
+        && Objects.equals(this.topicNamesToUnsubscribe, 
that.topicNamesToUnsubscribe)
         && Objects.equals(this.status, that.status)
         && this.version == that.version
         && this.type == that.type
@@ -130,6 +173,6 @@ public class PipeSubscribeHeartbeatResp extends 
TPipeSubscribeResp {
 
   @Override
   public int hashCode() {
-    return Objects.hash(topics, status, version, type, body);
+    return Objects.hash(topics, endPoints, topicNamesToUnsubscribe, status, 
version, type, body);
   }
 }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index 135e1055f47..1245a397442 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -20,9 +20,7 @@
 package org.apache.iotdb.session.subscription;
 
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
 import org.apache.iotdb.session.Session;
@@ -31,20 +29,11 @@ import org.apache.iotdb.session.SessionConnection;
 import org.apache.thrift.TException;
 
 import java.time.ZoneId;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.function.Supplier;
 
 public class SubscriptionSessionConnection extends SessionConnection {
 
-  private static final String SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
-  private static final String NODE_ID_COLUMN_NAME = "NodeID";
-  private static final String STATUS_COLUMN_NAME = "Status";
-  private static final String IP_COLUMN_NAME = "RpcAddress";
-  private static final String PORT_COLUMN_NAME = "RpcPort";
-  private static final String REMOVING_STATUS = "Removing";
-
   public SubscriptionSessionConnection(
       Session session,
       TEndPoint endPoint,
@@ -56,27 +45,6 @@ public class SubscriptionSessionConnection extends 
SessionConnection {
     super(session, endPoint, zoneId, availableNodes, maxRetryCount, 
retryIntervalInMs);
   }
 
-  // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
-  public Map<Integer, TEndPoint> fetchAllEndPoints()
-      throws IoTDBConnectionException, StatementExecutionException {
-    SessionDataSet dataSet = 
session.executeQueryStatement(SHOW_DATA_NODES_COMMAND);
-    SessionDataSet.DataIterator iterator = dataSet.iterator();
-    Map<Integer, TEndPoint> endPoints = new HashMap<>();
-    while (iterator.next()) {
-      // ignore removing DN
-      if (REMOVING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
-        continue;
-      }
-      String ip = iterator.getString(IP_COLUMN_NAME);
-      String port = iterator.getString(PORT_COLUMN_NAME);
-      if (ip != null && port != null) {
-        endPoints.put(
-            iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip, 
Integer.parseInt(port)));
-      }
-    }
-    return endPoints;
-  }
-
   public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) throws 
TException {
     return client.pipeSubscribe(req);
   }
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 7add29b4d7c..9e7242de200 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -1359,7 +1359,7 @@ abstract class SubscriptionConsumer implements 
AutoCloseable {
     }
     for (final SubscriptionProvider provider : providers) {
       try {
-        return provider.getSessionConnection().fetchAllEndPoints();
+        return provider.heartbeat().getEndPoints();
       } catch (final Exception e) {
         LOGGER.warn(
             "{} failed to fetch all endpoints from subscription provider {}, 
try next subscription provider...",
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index dc717e6a4c6..8d50fda6070 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -218,7 +218,7 @@ final class SubscriptionProvider extends 
SubscriptionSession {
 
   /////////////////////////////// subscription APIs 
///////////////////////////////
 
-  Map<String, TopicConfig> heartbeat() throws SubscriptionException {
+  PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException {
     final TPipeSubscribeResp resp;
     try {
       resp = 
getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
@@ -232,9 +232,7 @@ final class SubscriptionProvider extends 
SubscriptionSession {
       throw new SubscriptionConnectionException(e.getMessage(), e);
     }
     verifyPipeSubscribeSuccess(resp.status);
-    final PipeSubscribeHeartbeatResp heartbeatResp =
-        PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
-    return heartbeatResp.getTopics();
+    return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
   }
 
   Map<String, TopicConfig> subscribe(final Set<String> topicNames) throws 
SubscriptionException {
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
index 3d09cc3516a..6c0b7d03b18 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import 
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -94,7 +95,7 @@ final class SubscriptionProviders {
 
       final Map<Integer, TEndPoint> allEndPoints;
       try {
-        allEndPoints = 
defaultProvider.getSessionConnection().fetchAllEndPoints();
+        allEndPoints = defaultProvider.heartbeat().getEndPoints();
       } catch (final Exception e) {
         LOGGER.warn(
             "{} failed to fetch all endpoints from {} because of {}", 
consumer, endPoint, e, e);
@@ -243,7 +244,17 @@ final class SubscriptionProviders {
   private void heartbeatInternal(final SubscriptionConsumer consumer) {
     for (final SubscriptionProvider provider : getAllProviders()) {
       try {
-        consumer.subscribedTopics = provider.heartbeat();
+        final PipeSubscribeHeartbeatResp resp = provider.heartbeat();
+        // update subscribed topics
+        consumer.subscribedTopics = resp.getTopics();
+        // unsubscribe completed topics
+        for (final String topicName : resp.getTopicNamesToUnsubscribe()) {
+          LOGGER.info(
+              "Termination occurred when SubscriptionConsumer {} polling 
topics, unsubscribe topic {} automatically",
+              consumer.coreReportMessage(),
+              topicName);
+          consumer.unsubscribe(topicName);
+        }
         provider.setAvailable();
       } catch (final Exception e) {
         LOGGER.warn(
@@ -308,7 +319,7 @@ final class SubscriptionProviders {
       } else {
         // existing provider
         try {
-          consumer.subscribedTopics = provider.heartbeat();
+          consumer.subscribedTopics = provider.heartbeat().getTopics();
           provider.setAvailable();
         } catch (final Exception e) {
           LOGGER.warn(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index b750834f4ce..2ea4f0a731e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -30,6 +30,7 @@ import 
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -125,6 +126,16 @@ public class SubscriptionBrokerAgent {
     return broker.isCommitContextOutdated(commitContext);
   }
 
+  public List<String> fetchTopicNamesToUnsubscribe(
+      final ConsumerConfig consumerConfig, final Set<String> topicNames) {
+    final String consumerGroupId = consumerConfig.getConsumerGroupId();
+    final SubscriptionBroker broker = 
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+    if (Objects.isNull(broker)) {
+      return Collections.emptyList();
+    }
+    return broker.fetchTopicNamesToUnsubscribe(topicNames);
+  }
+
   /////////////////////////////// broker ///////////////////////////////
 
   public boolean isBrokerExist(final String consumerGroupId) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 6106b60cabf..0b434b7b331 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -339,6 +339,25 @@ public class SubscriptionBroker {
     return prefetchingQueue.isCommitContextOutdated(commitContext);
   }
 
+  public List<String> fetchTopicNamesToUnsubscribe(final Set<String> 
topicNames) {
+    final List<String> topicNamesToUnsubscribe = new ArrayList<>();
+
+    for (final String topicName : topicNames) {
+      final SubscriptionPrefetchingQueue prefetchingQueue =
+          topicNameToPrefetchingQueue.get(topicName);
+      // If there is no prefetching queue for the topic, check if it's 
completed
+      if (Objects.isNull(prefetchingQueue) && 
completedTopicNames.containsKey(topicName)) {
+        LOGGER.info(
+            "Subscription: prefetching queue bound to topic [{}] for consumer 
group [{}] is completed, reply to client heartbeat request",
+            topicName,
+            brokerId);
+        topicNamesToUnsubscribe.add(topicName);
+      }
+    }
+
+    return topicNamesToUnsubscribe;
+  }
+
   /////////////////////////////// prefetching queue 
///////////////////////////////
 
   public void bindPrefetchingQueue(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 87fde40a432..34acb35d880 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -19,13 +19,17 @@
 
 package org.apache.iotdb.db.subscription.receiver;
 
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.consensus.ConfigRegionId;
 import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,6 +43,7 @@ import 
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetri
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
 import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionPayloadExceedException;
 import 
org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
@@ -78,7 +83,9 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -260,13 +267,51 @@ public class SubscriptionReceiverV1 implements 
SubscriptionReceiver {
     // TODO: do something
 
     LOGGER.info("Subscription: consumer {} heartbeat successfully", 
consumerConfig);
-    return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
-        RpcUtils.SUCCESS_STATUS,
+
+    // fetch subscribed topics
+    final Map<String, TopicConfig> topics =
         SubscriptionAgent.topic()
             .getTopicConfigs(
                 SubscriptionAgent.consumer()
                     .getTopicNamesSubscribedByConsumer(
-                        consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId())));
+                        consumerConfig.getConsumerGroupId(), 
consumerConfig.getConsumerId()));
+
+    // fetch available endpoints
+    final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+    try (final ConfigNodeClient configNodeClient =
+        
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+      final TShowDataNodesResp resp = configNodeClient.showDataNodes();
+      // refer to org.apache.iotdb.session.NodesSupplier.updateDataNodeList
+
+      for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
+        // ignore removing DN
+        if (Objects.equals(NodeStatus.Removing.getStatus(), 
dataNodeInfo.getStatus())) {
+          continue;
+        }
+        final String ip = dataNodeInfo.getRpcAddresss();
+        final int port = dataNodeInfo.getRpcPort();
+        if (ip != null && port != 0) {
+          endPoints.put(dataNodeInfo.getDataNodeId(), new TEndPoint(ip, port));
+        }
+      }
+    } catch (final ClientManagerException | TException e) {
+      LOGGER.warn(
+          "Exception occurred when fetch endpoints for consumer {} in config 
node",
+          consumerConfig,
+          e);
+      final String exceptionMessage =
+          String.format(
+              "Subscription: Failed to fetch endpoints for consumer %s in 
config node, exception is %s.",
+              consumerConfig, e);
+      throw new SubscriptionException(exceptionMessage);
+    }
+
+    // fetch topics should be unsubscribed
+    final List<String> topicNamesToUnsubscribe =
+        
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig, 
topics.keySet());
+
+    return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
+        RpcUtils.SUCCESS_STATUS, topics, endPoints, topicNamesToUnsubscribe);
   }
 
   private TPipeSubscribeResp handlePipeSubscribeSubscribe(final 
PipeSubscribeSubscribeReq req) {

Reply via email to