This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 49bd622513c Subscription: support unsubscribe from completed topics
under client heartbeat thread (#15595) (#15603)
49bd622513c is described below
commit 49bd622513c6c45187e73e903c7e1b5bc2009a10
Author: VGalaxies <[email protected]>
AuthorDate: Thu May 29 18:26:06 2025 +0800
Subscription: support unsubscribe from completed topics under client
heartbeat thread (#15595) (#15603)
---
.../IoTDBSnapshotDevicePullConsumerDataSetIT.java | 3 ++
.../response/PipeSubscribeHeartbeatResp.java | 61 ++++++++++++++++++----
.../SubscriptionSessionConnection.java | 32 ------------
.../consumer/SubscriptionConsumer.java | 2 +-
.../consumer/SubscriptionProvider.java | 6 +--
.../consumer/SubscriptionProviders.java | 17 ++++--
.../agent/SubscriptionBrokerAgent.java | 11 ++++
.../db/subscription/broker/SubscriptionBroker.java | 19 +++++++
.../receiver/SubscriptionReceiverV1.java | 51 ++++++++++++++++--
9 files changed, 150 insertions(+), 52 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
index 10a83a7e674..aca980b88e0 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/mode/IoTDBSnapshotDevicePullConsumerDataSetIT.java
@@ -147,5 +147,8 @@ public class IoTDBSnapshotDevicePullConsumerDataSetIT
extends AbstractSubscripti
consume_data(consumer, session_dest);
check_count(8, "select count(s_0) from " + device, "Consume data again:" +
pattern);
check_count(8, "select count(s_1) from " + device, "Consumption data:
s_1");
+ while (!consumer.allTopicMessagesHaveBeenConsumed()) {
+ Thread.sleep(1000);
+ }
}
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
index 34a927ef29c..7bf02c7c6e7 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/payload/response/PipeSubscribeHeartbeatResp.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.rpc.subscription.payload.response;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
@@ -29,8 +30,10 @@ import org.apache.tsfile.utils.ReadWriteIOUtils;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -38,10 +41,23 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
private transient Map<String, TopicConfig> topics = new HashMap<>(); //
subscribed topics
+ private transient Map<Integer, TEndPoint> endPoints = new HashMap<>(); //
available endpoints
+
+ private transient List<String> topicNamesToUnsubscribe =
+ new ArrayList<>(); // topics should be unsubscribed
+
public Map<String, TopicConfig> getTopics() {
return topics;
}
+ public Map<Integer, TEndPoint> getEndPoints() {
+ return endPoints;
+ }
+
+ public List<String> getTopicNamesToUnsubscribe() {
+ return topicNamesToUnsubscribe;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
/**
@@ -63,7 +79,11 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
* server.
*/
public static PipeSubscribeHeartbeatResp toTPipeSubscribeResp(
- final TSStatus status, final Map<String, TopicConfig> topics) throws
IOException {
+ final TSStatus status,
+ final Map<String, TopicConfig> topics,
+ final Map<Integer, TEndPoint> endPoints,
+ final List<String> topicNamesToUnsubscribe)
+ throws IOException {
final PipeSubscribeHeartbeatResp resp = toTPipeSubscribeResp(status);
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
@@ -73,6 +93,13 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
entry.getValue().serialize(outputStream);
}
+ ReadWriteIOUtils.write(endPoints.size(), outputStream);
+ for (final Map.Entry<Integer, TEndPoint> entry : endPoints.entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue().getIp(), outputStream);
+ ReadWriteIOUtils.write(entry.getValue().getPort(), outputStream);
+ }
+ ReadWriteIOUtils.writeStringList(topicNamesToUnsubscribe, outputStream);
resp.body =
Collections.singletonList(
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
@@ -89,14 +116,28 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
if (Objects.nonNull(heartbeatResp.body)) {
for (final ByteBuffer byteBuffer : heartbeatResp.body) {
if (Objects.nonNull(byteBuffer) && byteBuffer.hasRemaining()) {
- final int size = ReadWriteIOUtils.readInt(byteBuffer);
- final Map<String, TopicConfig> topics = new HashMap<>();
- for (int i = 0; i < size; i++) {
- final String topicName = ReadWriteIOUtils.readString(byteBuffer);
- final TopicConfig topicConfig =
TopicConfig.deserialize(byteBuffer);
- topics.put(topicName, topicConfig);
+ {
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ final Map<String, TopicConfig> topics = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ final String topicName = ReadWriteIOUtils.readString(byteBuffer);
+ final TopicConfig topicConfig =
TopicConfig.deserialize(byteBuffer);
+ topics.put(topicName, topicConfig);
+ }
+ resp.topics = topics;
+ }
+ {
+ final int size = ReadWriteIOUtils.readInt(byteBuffer);
+ final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ final int nodeId = ReadWriteIOUtils.readInt(byteBuffer);
+ final String ip = ReadWriteIOUtils.readString(byteBuffer);
+ final int port = ReadWriteIOUtils.readInt(byteBuffer);
+ endPoints.put(nodeId, new TEndPoint(ip, port));
+ }
+ resp.endPoints = endPoints;
}
- resp.topics = topics;
+ resp.topicNamesToUnsubscribe =
ReadWriteIOUtils.readStringList(byteBuffer);
break;
}
}
@@ -122,6 +163,8 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
}
final PipeSubscribeHeartbeatResp that = (PipeSubscribeHeartbeatResp) obj;
return Objects.equals(this.topics, that.topics)
+ && Objects.equals(this.endPoints, that.endPoints)
+ && Objects.equals(this.topicNamesToUnsubscribe,
that.topicNamesToUnsubscribe)
&& Objects.equals(this.status, that.status)
&& this.version == that.version
&& this.type == that.type
@@ -130,6 +173,6 @@ public class PipeSubscribeHeartbeatResp extends
TPipeSubscribeResp {
@Override
public int hashCode() {
- return Objects.hash(topics, status, version, type, body);
+ return Objects.hash(topics, endPoints, topicNamesToUnsubscribe, status,
version, type, body);
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index 135e1055f47..1245a397442 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -20,9 +20,7 @@
package org.apache.iotdb.session.subscription;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
-import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeReq;
import org.apache.iotdb.service.rpc.thrift.TPipeSubscribeResp;
import org.apache.iotdb.session.Session;
@@ -31,20 +29,11 @@ import org.apache.iotdb.session.SessionConnection;
import org.apache.thrift.TException;
import java.time.ZoneId;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.function.Supplier;
public class SubscriptionSessionConnection extends SessionConnection {
- private static final String SHOW_DATA_NODES_COMMAND = "SHOW DATANODES";
- private static final String NODE_ID_COLUMN_NAME = "NodeID";
- private static final String STATUS_COLUMN_NAME = "Status";
- private static final String IP_COLUMN_NAME = "RpcAddress";
- private static final String PORT_COLUMN_NAME = "RpcPort";
- private static final String REMOVING_STATUS = "Removing";
-
public SubscriptionSessionConnection(
Session session,
TEndPoint endPoint,
@@ -56,27 +45,6 @@ public class SubscriptionSessionConnection extends
SessionConnection {
super(session, endPoint, zoneId, availableNodes, maxRetryCount,
retryIntervalInMs);
}
- // from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
- public Map<Integer, TEndPoint> fetchAllEndPoints()
- throws IoTDBConnectionException, StatementExecutionException {
- SessionDataSet dataSet =
session.executeQueryStatement(SHOW_DATA_NODES_COMMAND);
- SessionDataSet.DataIterator iterator = dataSet.iterator();
- Map<Integer, TEndPoint> endPoints = new HashMap<>();
- while (iterator.next()) {
- // ignore removing DN
- if (REMOVING_STATUS.equals(iterator.getString(STATUS_COLUMN_NAME))) {
- continue;
- }
- String ip = iterator.getString(IP_COLUMN_NAME);
- String port = iterator.getString(PORT_COLUMN_NAME);
- if (ip != null && port != null) {
- endPoints.put(
- iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip,
Integer.parseInt(port)));
- }
- }
- return endPoints;
- }
-
public TPipeSubscribeResp pipeSubscribe(final TPipeSubscribeReq req) throws
TException {
return client.pipeSubscribe(req);
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
index 7add29b4d7c..9e7242de200 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java
@@ -1359,7 +1359,7 @@ abstract class SubscriptionConsumer implements
AutoCloseable {
}
for (final SubscriptionProvider provider : providers) {
try {
- return provider.getSessionConnection().fetchAllEndPoints();
+ return provider.heartbeat().getEndPoints();
} catch (final Exception e) {
LOGGER.warn(
"{} failed to fetch all endpoints from subscription provider {},
try next subscription provider...",
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index dc717e6a4c6..8d50fda6070 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -218,7 +218,7 @@ final class SubscriptionProvider extends
SubscriptionSession {
/////////////////////////////// subscription APIs
///////////////////////////////
- Map<String, TopicConfig> heartbeat() throws SubscriptionException {
+ PipeSubscribeHeartbeatResp heartbeat() throws SubscriptionException {
final TPipeSubscribeResp resp;
try {
resp =
getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
@@ -232,9 +232,7 @@ final class SubscriptionProvider extends
SubscriptionSession {
throw new SubscriptionConnectionException(e.getMessage(), e);
}
verifyPipeSubscribeSuccess(resp.status);
- final PipeSubscribeHeartbeatResp heartbeatResp =
- PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
- return heartbeatResp.getTopics();
+ return PipeSubscribeHeartbeatResp.fromTPipeSubscribeResp(resp);
}
Map<String, TopicConfig> subscribe(final Set<String> topicNames) throws
SubscriptionException {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
index 3d09cc3516a..6c0b7d03b18 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProviders.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionConnectionException;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import
org.apache.iotdb.rpc.subscription.payload.response.PipeSubscribeHeartbeatResp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,7 +95,7 @@ final class SubscriptionProviders {
final Map<Integer, TEndPoint> allEndPoints;
try {
- allEndPoints =
defaultProvider.getSessionConnection().fetchAllEndPoints();
+ allEndPoints = defaultProvider.heartbeat().getEndPoints();
} catch (final Exception e) {
LOGGER.warn(
"{} failed to fetch all endpoints from {} because of {}",
consumer, endPoint, e, e);
@@ -243,7 +244,17 @@ final class SubscriptionProviders {
private void heartbeatInternal(final SubscriptionConsumer consumer) {
for (final SubscriptionProvider provider : getAllProviders()) {
try {
- consumer.subscribedTopics = provider.heartbeat();
+ final PipeSubscribeHeartbeatResp resp = provider.heartbeat();
+ // update subscribed topics
+ consumer.subscribedTopics = resp.getTopics();
+ // unsubscribe completed topics
+ for (final String topicName : resp.getTopicNamesToUnsubscribe()) {
+ LOGGER.info(
+ "Termination occurred when SubscriptionConsumer {} polling
topics, unsubscribe topic {} automatically",
+ consumer.coreReportMessage(),
+ topicName);
+ consumer.unsubscribe(topicName);
+ }
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
@@ -308,7 +319,7 @@ final class SubscriptionProviders {
} else {
// existing provider
try {
- consumer.subscribedTopics = provider.heartbeat();
+ consumer.subscribedTopics = provider.heartbeat().getTopics();
provider.setAvailable();
} catch (final Exception e) {
LOGGER.warn(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
index b750834f4ce..2ea4f0a731e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -125,6 +126,16 @@ public class SubscriptionBrokerAgent {
return broker.isCommitContextOutdated(commitContext);
}
+ public List<String> fetchTopicNamesToUnsubscribe(
+ final ConsumerConfig consumerConfig, final Set<String> topicNames) {
+ final String consumerGroupId = consumerConfig.getConsumerGroupId();
+ final SubscriptionBroker broker =
consumerGroupIdToSubscriptionBroker.get(consumerGroupId);
+ if (Objects.isNull(broker)) {
+ return Collections.emptyList();
+ }
+ return broker.fetchTopicNamesToUnsubscribe(topicNames);
+ }
+
/////////////////////////////// broker ///////////////////////////////
public boolean isBrokerExist(final String consumerGroupId) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
index 6106b60cabf..0b434b7b331 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionBroker.java
@@ -339,6 +339,25 @@ public class SubscriptionBroker {
return prefetchingQueue.isCommitContextOutdated(commitContext);
}
+ public List<String> fetchTopicNamesToUnsubscribe(final Set<String>
topicNames) {
+ final List<String> topicNamesToUnsubscribe = new ArrayList<>();
+
+ for (final String topicName : topicNames) {
+ final SubscriptionPrefetchingQueue prefetchingQueue =
+ topicNameToPrefetchingQueue.get(topicName);
+ // If there is no prefetching queue for the topic, check if it's
completed
+ if (Objects.isNull(prefetchingQueue) &&
completedTopicNames.containsKey(topicName)) {
+ LOGGER.info(
+ "Subscription: prefetching queue bound to topic [{}] for consumer
group [{}] is completed, reply to client heartbeat request",
+ topicName,
+ brokerId);
+ topicNamesToUnsubscribe.add(topicName);
+ }
+ }
+
+ return topicNamesToUnsubscribe;
+ }
+
/////////////////////////////// prefetching queue
///////////////////////////////
public void bindPrefetchingQueue(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index 87fde40a432..34acb35d880 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -19,13 +19,17 @@
package org.apache.iotdb.db.subscription.receiver;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -39,6 +43,7 @@ import
org.apache.iotdb.db.subscription.metric.SubscriptionPrefetchingQueueMetri
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.config.TopicConfig;
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionPayloadExceedException;
import
org.apache.iotdb.rpc.subscription.exception.SubscriptionPipeTimeoutException;
@@ -78,7 +83,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -260,13 +267,51 @@ public class SubscriptionReceiverV1 implements
SubscriptionReceiver {
// TODO: do something
LOGGER.info("Subscription: consumer {} heartbeat successfully",
consumerConfig);
- return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
- RpcUtils.SUCCESS_STATUS,
+
+ // fetch subscribed topics
+ final Map<String, TopicConfig> topics =
SubscriptionAgent.topic()
.getTopicConfigs(
SubscriptionAgent.consumer()
.getTopicNamesSubscribedByConsumer(
- consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId())));
+ consumerConfig.getConsumerGroupId(),
consumerConfig.getConsumerId()));
+
+ // fetch available endpoints
+ final Map<Integer, TEndPoint> endPoints = new HashMap<>();
+ try (final ConfigNodeClient configNodeClient =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ final TShowDataNodesResp resp = configNodeClient.showDataNodes();
+ // refer to org.apache.iotdb.session.NodesSupplier.updateDataNodeList
+
+ for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
+ // ignore removing DN
+ if (Objects.equals(NodeStatus.Removing.getStatus(),
dataNodeInfo.getStatus())) {
+ continue;
+ }
+ final String ip = dataNodeInfo.getRpcAddresss();
+ final int port = dataNodeInfo.getRpcPort();
+ if (ip != null && port != 0) {
+ endPoints.put(dataNodeInfo.getDataNodeId(), new TEndPoint(ip, port));
+ }
+ }
+ } catch (final ClientManagerException | TException e) {
+ LOGGER.warn(
+ "Exception occurred when fetch endpoints for consumer {} in config
node",
+ consumerConfig,
+ e);
+ final String exceptionMessage =
+ String.format(
+ "Subscription: Failed to fetch endpoints for consumer %s in
config node, exception is %s.",
+ consumerConfig, e);
+ throw new SubscriptionException(exceptionMessage);
+ }
+
+ // fetch topics should be unsubscribed
+ final List<String> topicNamesToUnsubscribe =
+
SubscriptionAgent.broker().fetchTopicNamesToUnsubscribe(consumerConfig,
topics.keySet());
+
+ return PipeSubscribeHeartbeatResp.toTPipeSubscribeResp(
+ RpcUtils.SUCCESS_STATUS, topics, endPoints, topicNamesToUnsubscribe);
}
private TPipeSubscribeResp handlePipeSubscribeSubscribe(final
PipeSubscribeSubscribeReq req) {