This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 75b5fa607b4 Subscription: intro node-urls and endpoints syncer for
consumer high availablity (#12275)
75b5fa607b4 is described below
commit 75b5fa607b41d8816e3d9581beccae81b68369cf
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Apr 10 10:21:44 2024 +0800
Subscription: intro node-urls and endpoints syncer for consumer high
availablity (#12275)
After this commit, it will be possible to specify either host:port or
node-urls as initial endpoints when creating a subscription consumer. The
priority of host:port is higher than node-urls. Upon opening the consumer, it
will iterate through the initial endpoints for handshake to obtain the
endpoints of all DNs in the entire cluster. Additionally, there will be an
endpoints syncer that periodically checks for the addition or removal of DN
nodes in the cluster, dynamically adjusting t [...]
---
.../it/dual/IoTDBSubscriptionTopicIT.java | 4 +-
.../it/local/IoTDBSubscriptionBasicIT.java | 5 +-
.../it/local/IoTDBSubscriptionIdempotentIT.java | 5 +-
.../it/local/IoTDBSubscriptionRestartIT.java | 289 +++++++++++++++-
.../rpc/subscription/config/ConsumerConstant.java | 17 +-
.../exception}/SubscriptionException.java | 2 +-
.../SubscriptionParameterNotValidException.java} | 18 +-
.../SubscriptionPollTimeOutException.java | 16 +-
.../subscription/ConsumerHeartbeatWorker.java | 26 +-
.../subscription/PullConsumerAutoCommitWorker.java | 10 +-
.../session/subscription/SubscriptionConsumer.java | 383 ++++++++++++++++++---
.../subscription/SubscriptionEndpointsSyncer.java | 131 +++++++
.../session/subscription/SubscriptionProvider.java | 48 ++-
.../subscription/SubscriptionPullConsumer.java | 99 ++++--
.../subscription/SubscriptionPushConsumer.java | 2 +-
.../session/subscription/SubscriptionSession.java | 20 +-
.../SubscriptionSessionConnection.java | 15 -
.../session/subscription/model/Subscription.java | 6 +-
.../iotdb/session/subscription/model/Topic.java | 4 +-
.../persistence/subscription/SubscriptionInfo.java | 2 +-
.../AbstractOperateSubscriptionProcedure.java | 2 +-
.../consumer/AlterConsumerGroupProcedure.java | 2 +-
.../runtime/ConsumerGroupMetaSyncProcedure.java | 2 +-
.../subscription/CreateSubscriptionProcedure.java | 2 +-
.../subscription/DropSubscriptionProcedure.java | 2 +-
.../subscription/topic/AlterTopicProcedure.java | 2 +-
.../subscription/topic/CreateTopicProcedure.java | 2 +-
.../subscription/topic/DropTopicProcedure.java | 2 +-
.../topic/runtime/TopicMetaSyncProcedure.java | 2 +-
.../impl/DataNodeInternalRPCServiceImpl.java | 2 +-
.../agent/SubscriptionAgentLauncher.java | 2 +-
.../agent/SubscriptionConsumerAgent.java | 2 +-
.../receiver/SubscriptionReceiverV1.java | 2 +-
.../SubscriptionConnectorSubtaskManager.java | 2 +-
.../meta/consumer/ConsumerGroupMeta.java | 2 +-
.../consumer/ConsumerGroupDeSerTest.java | 2 +-
36 files changed, 964 insertions(+), 170 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
index e345c7232a3..e8cc96c3f12 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/IoTDBSubscriptionTopicIT.java
@@ -236,7 +236,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
consumer.unsubscribe("topic1");
} catch (final Exception e) {
e.printStackTrace();
- // Avoid fail
+ // Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
@@ -338,7 +338,7 @@ public class IoTDBSubscriptionTopicIT extends
AbstractSubscriptionDualIT {
consumer.unsubscribe("topic1");
} catch (final Exception e) {
e.printStackTrace();
- // Avoid fail
+ // Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
index fc175adf97d..93a02c8b836 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionBasicIT.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.subscription.it.local;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.session.subscription.SubscriptionMessage;
import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
@@ -49,7 +48,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class, ClusterIT.class})
+@Category({LocalStandaloneIT.class})
public class IoTDBSubscriptionBasicIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionBasicIT.class);
@@ -131,7 +130,7 @@ public class IoTDBSubscriptionBasicIT {
consumer.unsubscribe("topic1");
} catch (final Exception e) {
e.printStackTrace();
- // avoid fail
+ // Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
index 4912c2d1f68..ff5a86bc220 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionIdempotentIT.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.subscription.it.local;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
-import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
import org.apache.iotdb.session.subscription.SubscriptionSession;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.fail;
@RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class, ClusterIT.class})
+@Category({LocalStandaloneIT.class})
public class IoTDBSubscriptionIdempotentIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionIdempotentIT.class);
@@ -150,7 +149,7 @@ public class IoTDBSubscriptionIdempotentIT {
.autoCommit(false)
.buildPullConsumer()) {
consumer.open();
- // unsubscribe existed non-subscribed topic
+ // Unsubscribe existed non-subscribed topic
consumer.unsubscribe("topic1");
} catch (final Exception e) {
e.printStackTrace();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
index 6f909d5dc42..9e33a349e27 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/IoTDBSubscriptionRestartIT.java
@@ -24,12 +24,13 @@ import
org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
+import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
-import org.apache.iotdb.itbase.category.LocalStandaloneIT;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.session.subscription.SubscriptionMessage;
import org.apache.iotdb.session.subscription.SubscriptionPullConsumer;
@@ -57,14 +58,23 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
-@Category({LocalStandaloneIT.class, ClusterIT.class})
+@Category({ClusterIT.class})
public class IoTDBSubscriptionRestartIT {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBSubscriptionRestartIT.class);
@Before
public void setUp() throws Exception {
- EnvFactory.getEnv().initClusterEnvironment();
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+
.setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
+ .setSchemaReplicationFactor(3)
+ .setDataReplicationFactor(2);
+
+ EnvFactory.getEnv().initClusterEnvironment(3, 3);
}
@After
@@ -73,7 +83,7 @@ public class IoTDBSubscriptionRestartIT {
}
@Test
- public void testSubscriptionAfterRestart() throws Exception {
+ public void testSubscriptionAfterRestartCluster() throws Exception {
final String host = EnvFactory.getEnv().getIP();
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
@@ -174,7 +184,7 @@ public class IoTDBSubscriptionRestartIT {
consumer.unsubscribe("topic1");
} catch (final Exception e) {
e.printStackTrace();
- // avoid fail
+ // Avoid failure
} finally {
LOGGER.info("consumer exiting...");
}
@@ -197,4 +207,273 @@ public class IoTDBSubscriptionRestartIT {
thread.join();
}
}
+
+ @Test
+ public void testSubscriptionAfterRestartDataNode() throws Exception {
+ // Fetch ip and port from DN 0
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ // Create topic
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ session.createTopic("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final SubscriptionPullConsumer consumer;
+ try {
+ consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(true)
+ .heartbeatIntervalMs(1000)
+ .endpointsSyncIntervalMs(5000) // narrow endpoints sync interval
+ .buildPullConsumer();
+ consumer.open();
+ consumer.subscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ return;
+ }
+
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Shutdown DN 1 & DN 2
+ Thread.sleep(10000); // wait some time
+ EnvFactory.getEnv().shutdownDataNode(1);
+ EnvFactory.getEnv().shutdownDataNode(2);
+
+ // Subscription again
+ final Map<Long, Long> timestamps = new HashMap<>();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumerRef = consumer) {
+ while (!isClosed.get()) {
+ try {
+ Thread.sleep(1000); // wait some time
+ } catch (final InterruptedException e) {
+ break;
+ }
+ final List<SubscriptionMessage> messages;
+ try {
+ messages = consumerRef.poll(Duration.ofMillis(10000));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ continue;
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionSessionDataSets payload =
+ (SubscriptionSessionDataSets) message.getPayload();
+ for (final SubscriptionSessionDataSet dataSet : payload) {
+ while (dataSet.hasNext()) {
+ final long timestamp = dataSet.next().getTimestamp();
+ timestamps.put(timestamp, timestamp);
+ }
+ }
+ // Auto commit
+ }
+ }
+ consumerRef.unsubscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Start DN 1 & DN 2
+ Thread.sleep(10000); // wait some time
+ EnvFactory.getEnv().startDataNode(1);
+ EnvFactory.getEnv().startDataNode(2);
+ ((AbstractEnv) EnvFactory.getEnv()).checkClusterStatusWithoutUnknown();
+
+ // Insert some realtime data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 100; i < 200; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Check timestamps size
+ try {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
+
+ @Test
+ public void testSubscriptionWhenConfigNodeLeaderChange() throws Exception {
+ // Fetch ip and port from DN 0
+ final String host = EnvFactory.getEnv().getIP();
+ final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
+
+ // Create topic
+ try (final SubscriptionSession session = new SubscriptionSession(host,
port)) {
+ session.open();
+ session.createTopic("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription
+ final SubscriptionPullConsumer consumer;
+ try {
+ consumer =
+ new SubscriptionPullConsumer.Builder()
+ .host(host)
+ .port(port)
+ .consumerId("c1")
+ .consumerGroupId("cg1")
+ .autoCommit(true)
+ .heartbeatIntervalMs(1000)
+ .endpointsSyncIntervalMs(5000) // narrow endpoints sync interval
+ .buildPullConsumer();
+ consumer.open();
+ consumer.subscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ return;
+ }
+
+ // Insert some historical data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 0; i < 100; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Subscription again
+ final Map<Long, Long> timestamps = new HashMap<>();
+ final AtomicBoolean isClosed = new AtomicBoolean(false);
+ final Thread thread =
+ new Thread(
+ () -> {
+ try (final SubscriptionPullConsumer consumerRef = consumer) {
+ while (!isClosed.get()) {
+ try {
+ Thread.sleep(1000); // wait some time
+ } catch (final InterruptedException e) {
+ break;
+ }
+ final List<SubscriptionMessage> messages;
+ try {
+ messages = consumerRef.poll(Duration.ofMillis(10000));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ continue;
+ }
+ for (final SubscriptionMessage message : messages) {
+ final SubscriptionSessionDataSets payload =
+ (SubscriptionSessionDataSets) message.getPayload();
+ for (final SubscriptionSessionDataSet dataSet : payload) {
+ while (dataSet.hasNext()) {
+ final long timestamp = dataSet.next().getTimestamp();
+ timestamps.put(timestamp, timestamp);
+ }
+ }
+ // Auto commit
+ }
+ }
+ consumerRef.unsubscribe("topic1");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ // Avoid failure
+ } finally {
+ LOGGER.info("consumer exiting...");
+ }
+ });
+ thread.start();
+
+ // Shutdown leader CN
+
EnvFactory.getEnv().shutdownConfigNode(EnvFactory.getEnv().getLeaderConfigNodeIndex());
+
+ // Insert some realtime data
+ try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
+ for (int i = 100; i < 200; ++i) {
+ session.executeNonQueryStatement(
+ String.format("insert into root.db.d1(time, s1) values (%s, 1)",
i));
+ }
+ session.executeNonQueryStatement("flush");
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ // Show topics and subscriptions
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
+ final TShowTopicResp showTopicResp = client.showTopic(new
TShowTopicReq());
+ Assert.assertEquals(RpcUtils.SUCCESS_STATUS.getCode(),
showTopicResp.status.getCode());
+ Assert.assertNotNull(showTopicResp.topicInfoList);
+ Assert.assertEquals(1, showTopicResp.topicInfoList.size());
+
+ final TShowSubscriptionResp showSubscriptionResp =
+ client.showSubscription(new TShowSubscriptionReq());
+ Assert.assertEquals(RpcUtils.SUCCESS_STATUS.getCode(),
showSubscriptionResp.status.getCode());
+ Assert.assertNotNull(showSubscriptionResp.subscriptionInfoList);
+ Assert.assertEquals(1, showSubscriptionResp.subscriptionInfoList.size());
+ }
+
+ // Check timestamps size
+ try {
+ // Keep retrying if there are execution failures
+ Awaitility.await()
+ .pollDelay(1, TimeUnit.SECONDS)
+ .pollInterval(1, TimeUnit.SECONDS)
+ .atMost(120, TimeUnit.SECONDS)
+ .untilAsserted(() -> Assert.assertEquals(200, timestamps.size()));
+ } catch (final Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ isClosed.set(true);
+ thread.join();
+ }
+ }
}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
index 3342f0c7706..1db77c7e03a 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/config/ConsumerConstant.java
@@ -25,20 +25,31 @@ public class ConsumerConstant {
public static final String HOST_KEY = "host";
public static final String PORT_KEY = "port";
+ public static final String NODE_URLS_KEY = "node-urls";
+
public static final String USERNAME_KEY = "username";
public static final String PASSWORD_KEY = "password";
public static final String CONSUMER_ID_KEY = "consumer-id";
public static final String CONSUMER_GROUP_ID_KEY = "group-id";
+ public static final String HEARTBEAT_INTERVAL_MS_KEY =
"heartbeat-interval-ms"; // unit: ms
+ public static final long HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE = 5000;
+ public static final long HEARTBEAT_INTERVAL_MS_MIN_VALUE = 1000;
+
+ public static final String ENDPOINTS_SYNC_INTERVAL_MS_KEY =
+ "endpoints-sync-interval-ms"; // unit: ms
+ public static final long ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE = 30000;
+ public static final long ENDPOINTS_SYNC_INTERVAL_MS_MIN_VALUE = 5000;
+
/////////////////////////////// pull consumer ///////////////////////////////
public static final String AUTO_COMMIT_KEY = "auto-commit";
public static final boolean AUTO_COMMIT_DEFAULT_VALUE = true;
- public static final String AUTO_COMMIT_INTERVAL_KEY =
"auto-commit-interval"; // unit: ms
- public static final int AUTO_COMMIT_INTERVAL_DEFAULT_VALUE = 5000;
- public static final int AUTO_COMMIT_INTERVAL_MIN_VALUE = 500;
+ public static final String AUTO_COMMIT_INTERVAL_MS_KEY =
"auto-commit-interval-ms"; // unit: ms
+ public static final long AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE = 5000;
+ public static final long AUTO_COMMIT_INTERVAL_MS_MIN_VALUE = 500;
/////////////////////////////// push consumer ///////////////////////////////
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/subscription/SubscriptionException.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionException.java
similarity index 96%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/subscription/SubscriptionException.java
rename to
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionException.java
index 064a8abf45e..eb93e29d8c0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/subscription/SubscriptionException.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionException.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.commons.exception.subscription;
+package org.apache.iotdb.rpc.subscription.exception;
import org.apache.iotdb.pipe.api.exception.PipeException;
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/SubscriptionException.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionParameterNotValidException.java
similarity index 59%
rename from
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/SubscriptionException.java
rename to
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionParameterNotValidException.java
index 8937400989b..a9992020ee9 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/SubscriptionException.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionParameterNotValidException.java
@@ -17,24 +17,30 @@
* under the License.
*/
-package org.apache.iotdb.rpc.subscription;
+package org.apache.iotdb.rpc.subscription.exception;
import java.util.Objects;
-public class SubscriptionException extends RuntimeException {
+public class SubscriptionParameterNotValidException extends
SubscriptionException {
- public SubscriptionException(String message) {
+ public SubscriptionParameterNotValidException(String message) {
super(message);
}
+ protected SubscriptionParameterNotValidException(String message, long
timeStamp) {
+ super(message, timeStamp);
+ }
+
@Override
public boolean equals(Object obj) {
- return obj instanceof SubscriptionException
- && Objects.equals(getMessage(), ((SubscriptionException)
obj).getMessage());
+ return obj instanceof SubscriptionParameterNotValidException
+ && Objects.equals(getMessage(),
((SubscriptionParameterNotValidException) obj).getMessage())
+ && Objects.equals(
+ getTimeStamp(), ((SubscriptionParameterNotValidException)
obj).getTimeStamp());
}
@Override
public int hashCode() {
- return Objects.hash(getMessage());
+ return Objects.hash(getMessage(), getTimeStamp());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/subscription/SubscriptionPollTimeOutException.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionPollTimeOutException.java
similarity index 69%
rename from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/subscription/SubscriptionPollTimeOutException.java
rename to
iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionPollTimeOutException.java
index 20ba7a63016..6d23e477c63 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/subscription/SubscriptionPollTimeOutException.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/subscription/exception/SubscriptionPollTimeOutException.java
@@ -17,7 +17,9 @@
* under the License.
*/
-package org.apache.iotdb.commons.exception.subscription;
+package org.apache.iotdb.rpc.subscription.exception;
+
+import java.util.Objects;
public class SubscriptionPollTimeOutException extends SubscriptionException {
@@ -28,4 +30,16 @@ public class SubscriptionPollTimeOutException extends
SubscriptionException {
protected SubscriptionPollTimeOutException(String message, long timeStamp) {
super(message, timeStamp);
}
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj instanceof SubscriptionPollTimeOutException
+ && Objects.equals(getMessage(), ((SubscriptionPollTimeOutException)
obj).getMessage())
+ && Objects.equals(getTimeStamp(), ((SubscriptionPollTimeOutException)
obj).getTimeStamp());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getMessage(), getTimeStamp());
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumerHeartbeatWorker.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumerHeartbeatWorker.java
index cf7e6f67904..12354ee7eea 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumerHeartbeatWorker.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/ConsumerHeartbeatWorker.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.session.subscription;
-import org.apache.iotdb.rpc.StatementExecutionException;
-
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +38,25 @@ public class ConsumerHeartbeatWorker implements Runnable {
return;
}
- for (SubscriptionSessionConnection connection :
consumer.getSessionConnections()) {
+ consumer.acquireWriteLock();
+ try {
+ heartbeatInternal();
+ } finally {
+ consumer.releaseWriteLock();
+ }
+ }
+
+ private void heartbeatInternal() {
+ for (final SubscriptionProvider provider : consumer.getAllProviders()) {
try {
- connection.heartbeat();
- } catch (TException | StatementExecutionException e) {
- // TODO: handle exception
- LOGGER.warn("something unexpected happened when heartbeat...", e);
+ provider.getSessionConnection().heartbeat();
+ provider.setAvailable();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "something unexpected happened when sending heartbeat to
subscription provider {}, exception: {}, set subscription provider unavailable",
+ provider,
+ e.getMessage());
+ provider.setUnavailable();
}
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/PullConsumerAutoCommitWorker.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/PullConsumerAutoCommitWorker.java
index f6817232e73..d69be7097ee 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/PullConsumerAutoCommitWorker.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/PullConsumerAutoCommitWorker.java
@@ -19,13 +19,9 @@
package org.apache.iotdb.session.subscription;
-import org.apache.iotdb.rpc.StatementExecutionException;
-
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Map;
import java.util.Set;
@@ -46,8 +42,8 @@ public class PullConsumerAutoCommitWorker implements Runnable
{
}
long currentTimestamp = System.currentTimeMillis();
- long index = currentTimestamp / consumer.getAutoCommitInterval();
- if (currentTimestamp % consumer.getAutoCommitInterval() == 0) {
+ long index = currentTimestamp / consumer.getAutoCommitIntervalMs();
+ if (currentTimestamp % consumer.getAutoCommitIntervalMs() == 0) {
index -= 1;
}
@@ -56,7 +52,7 @@ public class PullConsumerAutoCommitWorker implements Runnable
{
try {
consumer.commitSync(entry.getValue());
consumer.getUncommittedMessages().remove(entry.getKey());
- } catch (TException | IOException | StatementExecutionException e) {
+ } catch (final Exception e) {
LOGGER.warn("something unexpected happened when auto commit
messages...", e);
}
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
index 9df2885adee..4f93c97e795 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionConsumer.java
@@ -24,39 +24,55 @@ import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
+import java.util.SortedMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
public abstract class SubscriptionConsumer implements AutoCloseable {
- private final TEndPoint defaultEndPoint;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConsumer.class);
+
+ private static final IoTDBConnectionException NO_PROVIDERS_EXCEPTION =
+ new IoTDBConnectionException("Cluster has no available subscription
providers to connect");
+
+ private final List<TEndPoint> initialEndpoints;
+
private final String username;
private final String password;
private final String consumerId;
private final String consumerGroupId;
- private Map<Integer, SubscriptionProvider>
- subscriptionProviders; // contains default subscription provider, used
for poll and commit
- private SubscriptionProvider defaultSubscriptionProvider; // used for
subscribe and unsubscribe
+ private final long heartbeatIntervalMs;
+ private final long endpointsSyncIntervalMs;
+
+ private final SortedMap<Integer, SubscriptionProvider> subscriptionProviders
=
+ new ConcurrentSkipListMap<>();
+ private final ReentrantReadWriteLock subscriptionProvidersLock = new
ReentrantReadWriteLock(true);
- private static final long HEARTBEAT_INTERVAL = 5000; // unit: ms
private ScheduledExecutorService heartbeatWorkerExecutor;
+ private ScheduledExecutorService endpointsSyncerExecutor;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
@@ -71,31 +87,55 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
/////////////////////////////// ctor ///////////////////////////////
protected SubscriptionConsumer(Builder builder) {
- this.defaultEndPoint = new TEndPoint(builder.host, builder.port);
+ this.initialEndpoints = new ArrayList<>();
+ // From org.apache.iotdb.session.Session.getNodeUrls
+ // Priority is given to `host:port` over `nodeUrls`.
+ if (Objects.nonNull(builder.host)) {
+ initialEndpoints.add(new TEndPoint(builder.host, builder.port));
+ } else {
+
initialEndpoints.addAll(SessionUtils.parseSeedNodeUrls(builder.nodeUrls));
+ }
+
this.username = builder.username;
this.password = builder.password;
this.consumerId = builder.consumerId;
this.consumerGroupId = builder.consumerGroupId;
+
+ this.heartbeatIntervalMs = builder.heartbeatIntervalMs;
+ this.endpointsSyncIntervalMs = builder.endpointsSyncIntervalMs;
}
- protected SubscriptionConsumer(Builder builder, Properties config) {
+ protected SubscriptionConsumer(Builder builder, Properties properties) {
this(
builder
.host(
- (String) config.getOrDefault(ConsumerConstant.HOST_KEY,
SessionConfig.DEFAULT_HOST))
+ (String)
+ properties.getOrDefault(ConsumerConstant.HOST_KEY,
SessionConfig.DEFAULT_HOST))
.port(
(Integer)
- config.getOrDefault(ConsumerConstant.PORT_KEY,
SessionConfig.DEFAULT_PORT))
+ properties.getOrDefault(ConsumerConstant.PORT_KEY,
SessionConfig.DEFAULT_PORT))
+ .nodeUrls((List<String>)
properties.get(ConsumerConstant.NODE_URLS_KEY))
.username(
(String)
- config.getOrDefault(ConsumerConstant.USERNAME_KEY,
SessionConfig.DEFAULT_USER))
+ properties.getOrDefault(
+ ConsumerConstant.USERNAME_KEY,
SessionConfig.DEFAULT_USER))
.password(
(String)
- config.getOrDefault(
+ properties.getOrDefault(
ConsumerConstant.PASSWORD_KEY,
SessionConfig.DEFAULT_PASSWORD))
- .consumerId((String) config.get(ConsumerConstant.CONSUMER_ID_KEY))
- .consumerGroupId((String)
config.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY)));
+ .consumerId((String)
properties.get(ConsumerConstant.CONSUMER_ID_KEY))
+ .consumerGroupId((String)
properties.get(ConsumerConstant.CONSUMER_GROUP_ID_KEY))
+ .heartbeatIntervalMs(
+ (Long)
+ properties.getOrDefault(
+ ConsumerConstant.HEARTBEAT_INTERVAL_MS_KEY,
+ ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE))
+ .endpointsSyncIntervalMs(
+ (Long)
+ properties.getOrDefault(
+ ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_KEY,
+
ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE)));
}
/////////////////////////////// open & close ///////////////////////////////
@@ -106,26 +146,20 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return;
}
- subscriptionProviders = new HashMap<>();
- defaultSubscriptionProvider =
- new SubscriptionProvider(defaultEndPoint, username, password,
consumerId, consumerGroupId);
-
- int defaultDataNodeId = defaultSubscriptionProvider.handshake();
- for (Map.Entry<Integer, TEndPoint> entry :
- getDefaultSessionConnection().fetchAllEndPoints().entrySet()) {
- if (defaultDataNodeId == entry.getKey()) {
- subscriptionProviders.put(defaultDataNodeId,
defaultSubscriptionProvider);
- continue;
- }
- SubscriptionProvider subscriptionProvider =
- new SubscriptionProvider(
- entry.getValue(), username, password, consumerId,
consumerGroupId);
- subscriptionProvider.handshake();
- subscriptionProviders.put(entry.getKey(), subscriptionProvider);
+ // open subscription providers
+ acquireWriteLock();
+ try {
+ openProviders(); // throw IoTDBConnectionException
+ } finally {
+ releaseWriteLock();
}
+ // launch heartbeat worker
launchHeartbeatWorker();
+ // launch endpoints syncer
+ launchEndpointsSyncer();
+
isClosed.set(false);
}
@@ -136,48 +170,86 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
}
try {
+ // shutdown endpoints syncer
+ shutdownEndpointsSyncer();
+
// shutdown heartbeat worker
shutdownHeartbeatWorker();
- // close subscription provider
- for (SubscriptionProvider provider : subscriptionProviders.values()) {
- provider.close();
+ // close subscription providers
+ acquireWriteLock();
+ try {
+ closeProviders();
+ } finally {
+ releaseWriteLock();
}
} finally {
isClosed.set(true);
}
}
+ boolean isClosed() {
+ return isClosed.get();
+ }
+
+ /////////////////////////////// lock ///////////////////////////////
+
+ void acquireReadLock() {
+ subscriptionProvidersLock.readLock().lock();
+ }
+
+ void releaseReadLock() {
+ subscriptionProvidersLock.readLock().unlock();
+ }
+
+ void acquireWriteLock() {
+ subscriptionProvidersLock.writeLock().lock();
+ }
+
+ void releaseWriteLock() {
+ subscriptionProvidersLock.writeLock().unlock();
+ }
+
/////////////////////////////// subscribe & unsubscribe
///////////////////////////////
public void subscribe(String topicName)
- throws TException, IOException, StatementExecutionException {
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
subscribe(Collections.singleton(topicName));
}
public void subscribe(String... topicNames)
- throws TException, IOException, StatementExecutionException {
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
subscribe(new HashSet<>(Arrays.asList(topicNames)));
}
public void subscribe(Set<String> topicNames)
- throws TException, IOException, StatementExecutionException {
- getDefaultSessionConnection().subscribe(topicNames);
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
+ acquireReadLock();
+ try {
+ subscribeWithRedirection(topicNames);
+ } finally {
+ releaseReadLock();
+ }
}
public void unsubscribe(String topicName)
- throws TException, IOException, StatementExecutionException {
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
unsubscribe(Collections.singleton(topicName));
}
public void unsubscribe(String... topicNames)
- throws TException, IOException, StatementExecutionException {
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
unsubscribe(new HashSet<>(Arrays.asList(topicNames)));
}
public void unsubscribe(Set<String> topicNames)
- throws TException, IOException, StatementExecutionException {
- getDefaultSessionConnection().unsubscribe(topicNames);
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
+ acquireReadLock();
+ try {
+ unsubscribeWithRedirection(topicNames);
+ } finally {
+ releaseReadLock();
+ }
}
/////////////////////////////// heartbeat ///////////////////////////////
@@ -199,7 +271,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return t;
});
heartbeatWorkerExecutor.scheduleAtFixedRate(
- new ConsumerHeartbeatWorker(this), 0, HEARTBEAT_INTERVAL,
TimeUnit.MILLISECONDS);
+ new ConsumerHeartbeatWorker(this), 0, heartbeatIntervalMs,
TimeUnit.MILLISECONDS);
}
private void shutdownHeartbeatWorker() {
@@ -207,26 +279,209 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
heartbeatWorkerExecutor = null;
}
- boolean isClosed() {
- return isClosed.get();
+ /////////////////////////////// endpoints syncer
///////////////////////////////
+
+ @SuppressWarnings("unsafeThreadSchedule")
+ private void launchEndpointsSyncer() {
+ endpointsSyncerExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread t =
+ new Thread(
+ Thread.currentThread().getThreadGroup(), r,
"SubscriptionEndpointsSyncer", 0);
+ if (!t.isDaemon()) {
+ t.setDaemon(true);
+ }
+ if (t.getPriority() != Thread.NORM_PRIORITY) {
+ t.setPriority(Thread.NORM_PRIORITY);
+ }
+ return t;
+ });
+ endpointsSyncerExecutor.scheduleAtFixedRate(
+ new SubscriptionEndpointsSyncer(this), 0, endpointsSyncIntervalMs,
TimeUnit.MILLISECONDS);
+ }
+
+ private void shutdownEndpointsSyncer() {
+ endpointsSyncerExecutor.shutdown();
+ endpointsSyncerExecutor = null;
}
- /////////////////////////////// utility ///////////////////////////////
+ /////////////////////////////// subscription provider
///////////////////////////////
- private SubscriptionSessionConnection getDefaultSessionConnection() {
- return defaultSubscriptionProvider.getSessionConnection();
+ SubscriptionProvider constructProvider(final TEndPoint endPoint) {
+ return new SubscriptionProvider(
+ endPoint, this.username, this.password, this.consumerId,
this.consumerGroupId);
}
- protected List<SubscriptionSessionConnection> getSessionConnections() {
+ /** Caller should ensure that the method is called in the lock {@link
#acquireWriteLock()}. */
+ void openProviders() throws IoTDBConnectionException {
+ // close stale providers
+ closeProviders();
+
+ for (final TEndPoint endPoint : initialEndpoints) {
+ final SubscriptionProvider defaultProvider;
+ final int defaultDataNodeId;
+
+ try {
+ defaultProvider = constructProvider(endPoint);
+ defaultDataNodeId = defaultProvider.handshake();
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to create connection with {}, exception: {}",
endPoint, e.getMessage());
+ continue; // try next endpoint
+ }
+ addProvider(defaultDataNodeId, defaultProvider);
+
+ final Map<Integer, TEndPoint> allEndPoints;
+ try {
+ allEndPoints =
defaultProvider.getSessionConnection().fetchAllEndPoints();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to fetch all endpoints from {}, exception: {}, will retry
later...",
+ endPoint,
+ e.getMessage());
+ break; // retry later
+ }
+
+ for (final Map.Entry<Integer, TEndPoint> entry :
allEndPoints.entrySet()) {
+ if (defaultDataNodeId == entry.getKey()) {
+ continue;
+ }
+
+ final SubscriptionProvider provider;
+ try {
+ provider = constructProvider(entry.getValue());
+ provider.handshake();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to create connection with {}, exception: {}, will retry
later...",
+ entry.getValue(),
+ e.getMessage());
+ continue; // retry later
+ }
+ addProvider(entry.getKey(), provider);
+ }
+
+ break;
+ }
+
+ if (hasNoProviders()) {
+ throw NO_PROVIDERS_EXCEPTION;
+ }
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireWriteLock()}. */
+ private void closeProviders() throws IoTDBConnectionException {
+ for (final SubscriptionProvider provider : getAllProviders()) {
+ provider.close();
+ }
+ subscriptionProviders.clear();
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireWriteLock()}. */
+ void addProvider(final int dataNodeId, final SubscriptionProvider provider) {
+ // the subscription provider is opened
+ LOGGER.info("add new subscription provider {}", provider);
+ subscriptionProviders.put(dataNodeId, provider);
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireWriteLock()}. */
+ void closeAndRemoveProvider(final int dataNodeId) throws
IoTDBConnectionException {
+ if (!containsProvider(dataNodeId)) {
+ return;
+ }
+ final SubscriptionProvider provider =
subscriptionProviders.get(dataNodeId);
+ try {
+ provider.close();
+ } finally {
+ LOGGER.info("close and remove stale subscription provider {}", provider);
+ subscriptionProviders.remove(dataNodeId);
+ }
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ boolean hasNoProviders() {
+ return subscriptionProviders.isEmpty();
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ boolean containsProvider(final int dataNodeId) {
+ return subscriptionProviders.containsKey(dataNodeId);
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ List<SubscriptionProvider> getAllAvailableProviders() {
return subscriptionProviders.values().stream()
- .map(SubscriptionProvider::getSessionConnection)
+ .filter(SubscriptionProvider::isAvailable)
.collect(Collectors.toList());
}
- protected SubscriptionSessionConnection getSessionConnection(int dataNodeId)
{
- return subscriptionProviders
- .getOrDefault(dataNodeId, defaultSubscriptionProvider)
- .getSessionConnection();
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ List<SubscriptionProvider> getAllProviders() {
+ return new ArrayList<>(subscriptionProviders.values());
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ SubscriptionProvider getProvider(final int dataNodeId) {
+ return containsProvider(dataNodeId) ?
subscriptionProviders.get(dataNodeId) : null;
+ }
+
+ /////////////////////////////// redirection ///////////////////////////////
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ public void subscribeWithRedirection(final Set<String> topicNames)
+ throws IoTDBConnectionException {
+ for (final SubscriptionProvider provider : getAllAvailableProviders()) {
+ try {
+ provider.getSessionConnection().subscribe(topicNames);
+ return;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to subscribe topics {} from subscription provider {},
exception: {}, try next subscription provider...",
+ topicNames,
+ provider,
+ e.getMessage());
+ }
+ }
+ throw NO_PROVIDERS_EXCEPTION;
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ public void unsubscribeWithRedirection(final Set<String> topicNames)
+ throws IoTDBConnectionException {
+ for (final SubscriptionProvider provider : getAllAvailableProviders()) {
+ try {
+ provider.getSessionConnection().unsubscribe(topicNames);
+ return;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to unsubscribe topics {} from subscription provider {},
exception: {}, try next subscription provider...",
+ topicNames,
+ provider,
+ e.getMessage());
+ }
+ }
+ throw NO_PROVIDERS_EXCEPTION;
+ }
+
+ /** Caller should ensure that the method is called in the lock {@link
#acquireReadLock()}. */
+ public Map<Integer, TEndPoint> fetchAllEndPointsWithRedirection()
+ throws IoTDBConnectionException {
+ Map<Integer, TEndPoint> endPoints = null;
+ for (final SubscriptionProvider provider : getAllAvailableProviders()) {
+ try {
+ endPoints = provider.getSessionConnection().fetchAllEndPoints();
+ break;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to fetch all endpoints from subscription provider {},
exception: {}, try next subscription provider...",
+ provider,
+ e.getMessage());
+ }
+ }
+ if (Objects.isNull(endPoints)) {
+ throw NO_PROVIDERS_EXCEPTION;
+ }
+ return endPoints;
}
/////////////////////////////// builder ///////////////////////////////
@@ -235,6 +490,7 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
protected String host = SessionConfig.DEFAULT_HOST;
protected int port = SessionConfig.DEFAULT_PORT;
+ protected List<String> nodeUrls = null;
protected String username = SessionConfig.DEFAULT_USER;
protected String password = SessionConfig.DEFAULT_PASSWORD;
@@ -242,6 +498,10 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
protected String consumerId;
protected String consumerGroupId;
+ protected long heartbeatIntervalMs =
ConsumerConstant.HEARTBEAT_INTERVAL_MS_DEFAULT_VALUE;
+ protected long endpointsSyncIntervalMs =
+ ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_DEFAULT_VALUE;
+
public Builder host(String host) {
this.host = host;
return this;
@@ -252,6 +512,11 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return this;
}
+ public Builder nodeUrls(List<String> nodeUrls) {
+ this.nodeUrls = nodeUrls;
+ return this;
+ }
+
public Builder username(String username) {
this.username = username;
return this;
@@ -272,6 +537,18 @@ public abstract class SubscriptionConsumer implements
AutoCloseable {
return this;
}
+ public Builder heartbeatIntervalMs(long heartbeatIntervalMs) {
+ this.heartbeatIntervalMs =
+ Math.max(heartbeatIntervalMs,
ConsumerConstant.HEARTBEAT_INTERVAL_MS_MIN_VALUE);
+ return this;
+ }
+
+ public Builder endpointsSyncIntervalMs(long endpointsSyncIntervalMs) {
+ this.endpointsSyncIntervalMs =
+ Math.max(endpointsSyncIntervalMs,
ConsumerConstant.ENDPOINTS_SYNC_INTERVAL_MS_MIN_VALUE);
+ return this;
+ }
+
public abstract SubscriptionPullConsumer buildPullConsumer();
public abstract SubscriptionPushConsumer buildPushConsumer();
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
new file mode 100644
index 00000000000..d7aea1981d5
--- /dev/null
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionEndpointsSyncer.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.subscription;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class SubscriptionEndpointsSyncer implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionEndpointsSyncer.class);
+
+ private final SubscriptionConsumer consumer;
+
+ public SubscriptionEndpointsSyncer(SubscriptionConsumer consumer) {
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void run() {
+ if (consumer.isClosed()) {
+ return;
+ }
+
+ consumer.acquireWriteLock();
+ try {
+ syncInternal();
+ } finally {
+ consumer.releaseWriteLock();
+ }
+ }
+
+ private void syncInternal() {
+ if (consumer.hasNoProviders()) {
+ try {
+ consumer.openProviders();
+ } catch (final IoTDBConnectionException e) {
+ LOGGER.warn("something unexpected happened when syncing subscription
endpoints...", e);
+ return;
+ }
+ }
+
+ final Map<Integer, TEndPoint> allEndPoints;
+ try {
+ allEndPoints = consumer.fetchAllEndPointsWithRedirection();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to fetch all endpoints, exception: {}, will retry later...",
e.getMessage());
+ return; // retry later
+ }
+
+ // add new providers or handshake existing providers
+ for (final Map.Entry<Integer, TEndPoint> entry : allEndPoints.entrySet()) {
+ final SubscriptionProvider provider =
consumer.getProvider(entry.getKey());
+ if (Objects.isNull(provider)) {
+ // new provider
+ final SubscriptionProvider newProvider =
consumer.constructProvider(entry.getValue());
+ try {
+ newProvider.handshake();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Failed to create connection with subscription provider {},
exception: {}, will retry later...",
+ newProvider,
+ e.getMessage());
+ continue; // retry later
+ }
+ consumer.addProvider(entry.getKey(), newProvider);
+ } else {
+ // existing provider
+ try {
+ provider.getSessionConnection().heartbeat();
+ provider.setAvailable();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "something unexpected happened when sending heartbeat to
subscription provider {}, exception: {}, set subscription provider unavailable",
+ provider,
+ e.getMessage());
+ provider.setUnavailable();
+ }
+ // close and remove unavailable provider (reset the connection as much
as possible)
+ if (!provider.isAvailable()) {
+ try {
+ consumer.closeAndRemoveProvider(entry.getKey());
+ } catch (final IoTDBConnectionException e) {
+ LOGGER.warn(
+ "Exception occurred when closing and removing subscription
provider with data node id {}: {}",
+ entry.getKey(),
+ e.getMessage());
+ }
+ }
+ }
+ }
+
+ // close and remove stale providers
+ for (final SubscriptionProvider provider : consumer.getAllProviders()) {
+ final int dataNodeId = provider.getDataNodeId();
+ if (!allEndPoints.containsKey(dataNodeId)) {
+ try {
+ consumer.closeAndRemoveProvider(dataNodeId);
+ } catch (final IoTDBConnectionException e) {
+ LOGGER.warn(
+ "Exception occurred when closing and removing subscription
provider with data node id {}: {}",
+ dataNodeId,
+ e.getMessage());
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
index 33fa4215fbf..45006f43f64 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionProvider.java
@@ -38,6 +38,10 @@ final class SubscriptionProvider extends SubscriptionSession
{
private final String consumerGroupId;
private final AtomicBoolean isClosed = new AtomicBoolean(true);
+ private final AtomicBoolean isAvailable = new AtomicBoolean(false);
+
+ private final TEndPoint endPoint;
+ private int dataNodeId;
SubscriptionProvider(
TEndPoint endPoint,
@@ -47,6 +51,7 @@ final class SubscriptionProvider extends SubscriptionSession {
String consumerGroupId) {
super(endPoint.ip, endPoint.port, username, password);
+ this.endPoint = endPoint;
this.consumerId = consumerId;
this.consumerGroupId = consumerGroupId;
}
@@ -59,12 +64,13 @@ final class SubscriptionProvider extends
SubscriptionSession {
super.open();
- Map<String, String> consumerAttributes = new HashMap<>();
+ final Map<String, String> consumerAttributes = new HashMap<>();
consumerAttributes.put(ConsumerConstant.CONSUMER_GROUP_ID_KEY,
consumerGroupId);
consumerAttributes.put(ConsumerConstant.CONSUMER_ID_KEY, consumerId);
- int dataNodeId = getSessionConnection().handshake(new
ConsumerConfig(consumerAttributes));
+ dataNodeId = getSessionConnection().handshake(new
ConsumerConfig(consumerAttributes));
isClosed.set(false);
+ setAvailable();
return dataNodeId;
}
@@ -81,6 +87,7 @@ final class SubscriptionProvider extends SubscriptionSession {
throw new IoTDBConnectionException(e);
} finally {
super.close();
+ setUnavailable();
isClosed.set(true);
}
}
@@ -88,4 +95,41 @@ final class SubscriptionProvider extends SubscriptionSession
{
SubscriptionSessionConnection getSessionConnection() {
return (SubscriptionSessionConnection) defaultSessionConnection;
}
+
+ boolean isAvailable() {
+ return isAvailable.get();
+ }
+
+ void setAvailable() {
+ isAvailable.set(true);
+ }
+
+ void setUnavailable() {
+ isAvailable.set(false);
+ }
+
+ int getDataNodeId() {
+ return dataNodeId;
+ }
+
+ TEndPoint getEndPoint() {
+ return endPoint;
+ }
+
+ @Override
+ public String toString() {
+ return "SubscriptionProvider{endPoint="
+ + endPoint
+ + ", dataNodeId="
+ + dataNodeId
+ + ", consumerId="
+ + consumerId
+ + ", consumerGroupId="
+ + consumerGroupId
+ + ", isAvailable="
+ + isAvailable
+ + ", isClosed="
+ + isClosed
+ + "}";
+ }
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
index 97b04d563c8..892ae5a44bf 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPullConsumer.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.session.subscription;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.subscription.SubscriptionException;
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.thrift.TException;
@@ -36,6 +36,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
@@ -52,7 +53,7 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionPullConsumer.class);
private final boolean autoCommit;
- private final int autoCommitInterval;
+ private final long autoCommitIntervalMs;
private ScheduledExecutorService autoCommitWorkerExecutor;
private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
@@ -65,26 +66,29 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
super(builder);
this.autoCommit = builder.autoCommit;
- this.autoCommitInterval = builder.autoCommitInterval;
+ this.autoCommitIntervalMs = builder.autoCommitIntervalMs;
}
- public SubscriptionPullConsumer(Properties config) {
+ public SubscriptionPullConsumer(Properties properties) {
this(
- config,
+ properties,
(Boolean)
- config.getOrDefault(
+ properties.getOrDefault(
ConsumerConstant.AUTO_COMMIT_KEY,
ConsumerConstant.AUTO_COMMIT_DEFAULT_VALUE),
- (Integer)
- config.getOrDefault(
- ConsumerConstant.AUTO_COMMIT_INTERVAL_KEY,
- ConsumerConstant.AUTO_COMMIT_INTERVAL_DEFAULT_VALUE));
+ (Long)
+ properties.getOrDefault(
+ ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_KEY,
+ ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE));
}
- private SubscriptionPullConsumer(Properties config, boolean autoCommit, int
autoCommitInterval) {
- super(new
Builder().autoCommit(autoCommit).autoCommitInterval(autoCommitInterval),
config);
+ private SubscriptionPullConsumer(
+ Properties properties, boolean autoCommit, long autoCommitIntervalMs) {
+ super(
+ new
Builder().autoCommit(autoCommit).autoCommitIntervalMs(autoCommitIntervalMs),
+ properties);
this.autoCommit = autoCommit;
- this.autoCommitInterval = autoCommitInterval;
+ this.autoCommitIntervalMs = autoCommitIntervalMs;
}
/////////////////////////////// open & close ///////////////////////////////
@@ -143,34 +147,42 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
public List<SubscriptionMessage> poll(Set<String> topicNames, long timeoutMs)
throws TException, IOException, StatementExecutionException {
- // TODO: network timeout
List<EnrichedTablets> enrichedTabletsList = new ArrayList<>();
- for (SubscriptionSessionConnection connection : getSessionConnections()) {
- enrichedTabletsList.addAll(connection.poll(topicNames, timeoutMs));
+
+ acquireReadLock();
+ try {
+ for (final SubscriptionProvider provider : getAllAvailableProviders()) {
+ // TODO: network timeout
+
enrichedTabletsList.addAll(provider.getSessionConnection().poll(topicNames,
timeoutMs));
+ }
+ } finally {
+ releaseReadLock();
}
List<SubscriptionMessage> messages =
enrichedTabletsList.stream().map(SubscriptionMessage::new).collect(Collectors.toList());
+
if (autoCommit) {
long currentTimestamp = System.currentTimeMillis();
- long index = currentTimestamp / autoCommitInterval;
- if (currentTimestamp % autoCommitInterval == 0) {
+ long index = currentTimestamp / autoCommitIntervalMs;
+ if (currentTimestamp % autoCommitIntervalMs == 0) {
index -= 1;
}
uncommittedMessages
.computeIfAbsent(index, o -> new ConcurrentSkipListSet<>())
.addAll(messages);
}
+
return messages;
}
public void commitSync(SubscriptionMessage message)
- throws TException, IOException, StatementExecutionException {
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
commitSync(Collections.singletonList(message));
}
public void commitSync(Iterable<SubscriptionMessage> messages)
- throws TException, IOException, StatementExecutionException {
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
Map<Integer, Map<String, List<String>>>
dataNodeIdToTopicNameToSubscriptionCommitIds =
new HashMap<>();
for (SubscriptionMessage message : messages) {
@@ -190,8 +202,20 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
private void commitSyncInternal(
int dataNodeId, Map<String, List<String>>
topicNameToSubscriptionCommitIds)
- throws TException, IOException, StatementExecutionException {
-
getSessionConnection(dataNodeId).commitSync(topicNameToSubscriptionCommitIds);
+ throws TException, IOException, StatementExecutionException,
IoTDBConnectionException {
+ acquireReadLock();
+ try {
+ final SubscriptionProvider provider = getProvider(dataNodeId);
+ if (Objects.isNull(provider) || !provider.isAvailable()) {
+ throw new IoTDBConnectionException(
+ String.format(
+ "something unexpected happened when commit messages to
subscription provider with data node id %s, the subscription provider may be
unavailable or not existed",
+ dataNodeId));
+ }
+
provider.getSessionConnection().commitSync(topicNameToSubscriptionCommitIds);
+ } finally {
+ releaseReadLock();
+ }
}
/////////////////////////////// auto commit ///////////////////////////////
@@ -217,7 +241,7 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
return t;
});
autoCommitWorkerExecutor.scheduleAtFixedRate(
- new PullConsumerAutoCommitWorker(this), 0, autoCommitInterval,
TimeUnit.MILLISECONDS);
+ new PullConsumerAutoCommitWorker(this), 0, autoCommitIntervalMs,
TimeUnit.MILLISECONDS);
}
private void shutdownAutoCommitWorker() {
@@ -230,7 +254,7 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
try {
commitSync(entry.getValue());
uncommittedMessages.remove(entry.getKey());
- } catch (TException | IOException | StatementExecutionException e) {
+ } catch (final Exception e) {
LOGGER.warn("something unexpected happened when commit messages during
close", e);
}
}
@@ -240,8 +264,8 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
return isClosed.get();
}
- int getAutoCommitInterval() {
- return autoCommitInterval;
+ long getAutoCommitIntervalMs() {
+ return autoCommitIntervalMs;
}
SortedMap<Long, Set<SubscriptionMessage>> getUncommittedMessages() {
@@ -253,7 +277,7 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
public static class Builder extends SubscriptionConsumer.Builder {
private boolean autoCommit = ConsumerConstant.AUTO_COMMIT_DEFAULT_VALUE;
- private int autoCommitInterval =
ConsumerConstant.AUTO_COMMIT_INTERVAL_DEFAULT_VALUE;
+ private long autoCommitIntervalMs =
ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_DEFAULT_VALUE;
public Builder host(String host) {
super.host(host);
@@ -265,6 +289,11 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
return this;
}
+ public Builder nodeUrls(List<String> nodeUrls) {
+ super.nodeUrls(nodeUrls);
+ return this;
+ }
+
public Builder username(String username) {
super.username(username);
return this;
@@ -285,14 +314,24 @@ public class SubscriptionPullConsumer extends
SubscriptionConsumer {
return this;
}
+ public Builder heartbeatIntervalMs(long heartbeatIntervalMs) {
+ super.heartbeatIntervalMs(heartbeatIntervalMs);
+ return this;
+ }
+
+ public Builder endpointsSyncIntervalMs(long endpointsSyncIntervalMs) {
+ super.endpointsSyncIntervalMs(endpointsSyncIntervalMs);
+ return this;
+ }
+
public Builder autoCommit(boolean autoCommit) {
this.autoCommit = autoCommit;
return this;
}
- public Builder autoCommitInterval(int autoCommitInterval) {
- this.autoCommitInterval =
- Math.max(autoCommitInterval,
ConsumerConstant.AUTO_COMMIT_INTERVAL_MIN_VALUE);
+ public Builder autoCommitIntervalMs(long autoCommitIntervalMs) {
+ this.autoCommitIntervalMs =
+ Math.max(autoCommitIntervalMs,
ConsumerConstant.AUTO_COMMIT_INTERVAL_MS_MIN_VALUE);
return this;
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
index 602ea07a813..950a396d58d 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionPushConsumer.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.session.subscription;
-import org.apache.iotdb.rpc.subscription.SubscriptionException;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
// TODO
public class SubscriptionPushConsumer extends SubscriptionConsumer {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
index 8346277e2b5..c286c801489 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSession.java
@@ -24,7 +24,8 @@ import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.subscription.SubscriptionException;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
+import
org.apache.iotdb.rpc.subscription.exception.SubscriptionParameterNotValidException;
import org.apache.iotdb.session.Session;
import org.apache.iotdb.session.SessionConnection;
import org.apache.iotdb.session.subscription.model.Subscription;
@@ -35,6 +36,7 @@ import org.apache.iotdb.tsfile.read.common.RowRecord;
import java.time.ZoneId;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -55,15 +57,17 @@ public class SubscriptionSession extends Session {
.username(username)
.password(password)
// disable auto fetch
- .enableAutoFetch(false));
+ .enableAutoFetch(false)
+ // disable redirection
+ .enableRedirection(false));
}
@Override
public SessionConnection constructSessionConnection(
Session session, TEndPoint endpoint, ZoneId zoneId) throws
IoTDBConnectionException {
- if (endpoint == null) {
- return new SubscriptionSessionConnection(
- session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
+ if (Objects.isNull(endpoint)) {
+ throw new SubscriptionParameterNotValidException(
+ "Subscription session must be configured with an endpoint.");
}
return new SubscriptionSessionConnection(
session, endpoint, zoneId, availableNodes, maxRetryCount,
retryIntervalInMs);
@@ -77,14 +81,14 @@ public class SubscriptionSession extends Session {
executeNonQueryStatement(sql);
}
- public void createTopic(String topicName, Properties config)
+ public void createTopic(String topicName, Properties properties)
throws IoTDBConnectionException, StatementExecutionException {
- if (config.isEmpty()) {
+ if (properties.isEmpty()) {
createTopic(topicName);
}
final StringBuilder sb = new StringBuilder();
sb.append('(');
- config.forEach(
+ properties.forEach(
(k, v) ->
sb.append('\'')
.append(k)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
index b896e7cb189..891cd0fc4ff 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionConnection.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.session.subscription;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
-import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
@@ -70,16 +69,6 @@ public class SubscriptionSessionConnection extends
SessionConnection {
super(session, endPoint, zoneId, availableNodes, maxRetryCount,
retryIntervalInMs);
}
- public SubscriptionSessionConnection(
- Session session,
- ZoneId zoneId,
- Supplier<List<TEndPoint>> availableNodes,
- int maxRetryCount,
- long retryIntervalInMs)
- throws IoTDBConnectionException {
- super(session, zoneId, availableNodes, maxRetryCount, retryIntervalInMs);
- }
-
// from org.apache.iotdb.session.NodesSupplier.updateDataNodeList
public Map<Integer, TEndPoint> fetchAllEndPoints()
throws IoTDBConnectionException, StatementExecutionException {
@@ -93,10 +82,6 @@ public class SubscriptionSessionConnection extends
SessionConnection {
}
String ip = iterator.getString(IP_COLUMN_NAME);
String port = iterator.getString(PORT_COLUMN_NAME);
- // TODO: check logic
- if ("0.0.0.0".equals(ip)) {
- ip = SessionConfig.DEFAULT_HOST;
- }
if (ip != null && port != null) {
endPoints.put(
iterator.getInt(NODE_ID_COLUMN_NAME), new TEndPoint(ip,
Integer.parseInt(port)));
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
index c82c5261222..e5f227be17b 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Subscription.java
@@ -21,9 +21,9 @@ package org.apache.iotdb.session.subscription.model;
public class Subscription {
- private String topicName;
- private String consumerGroupId;
- private String consumerIds;
+ private final String topicName;
+ private final String consumerGroupId;
+ private final String consumerIds;
public Subscription(String topicName, String consumerGroupId, String
consumerIds) {
this.topicName = topicName;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Topic.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Topic.java
index 8a0d9e9e48d..7b1feb97a1a 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Topic.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/model/Topic.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.session.subscription.model;
public class Topic {
- private String topicName;
- private String topicAttributes;
+ private final String topicName;
+ private final String topicAttributes;
public Topic(String topicName, String topicAttributes) {
this.topicName = topicName;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index f3e024a7a41..b020ffc045d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.persistence.subscription;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import
org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMetaKeeper;
@@ -43,6 +42,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index e19e3e873e1..a15382ba484 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
@@ -35,6 +34,7 @@ import
org.apache.iotdb.confignode.procedure.state.subscription.OperateSubscript
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
index 49898b55753..b21d9018362 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.consumer;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.AlterConsumerGroupPlan;
@@ -32,6 +31,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index c35e736ab13..5f915113a1c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -20,7 +20,6 @@
package
org.apache.iotdb.confignode.procedure.impl.subscription.consumer.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.consumer.runtime.ConsumerGroupHandleMetaChangePlan;
@@ -32,6 +31,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 4fa93dfd1b4..86f49979648 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
@@ -41,6 +40,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
index ccdde5be71b..7796939e3ea 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.subscription;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
@@ -40,6 +39,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
index 5fd506dbc35..6df6f111322 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.AlterTopicPlan;
import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
@@ -31,6 +30,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index 9dc1584f35e..3732875fa02 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.CreateTopicPlan;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
@@ -32,6 +31,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
index 879b6382ee4..b5b6e335e60 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.DropTopicPlan;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import
org.apache.iotdb.confignode.procedure.impl.subscription.AbstractOperateSubscriptionProcedure;
@@ -29,6 +28,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
import org.slf4j.Logger;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index 35a51c19eb6..7d8cef7b22e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.confignode.procedure.impl.subscription.topic.runtime;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import
org.apache.iotdb.confignode.consensus.request.write.subscription.topic.runtime.TopicHandleMetaChangePlan;
@@ -32,6 +31,7 @@ import
org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 7acd30dff78..ae9ded78d1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -39,7 +39,6 @@ import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathDeserializeUtil;
import org.apache.iotdb.commons.path.PathPatternTree;
@@ -221,6 +220,7 @@ import org.apache.iotdb.mpp.rpc.thrift.TUpdateTemplateReq;
import org.apache.iotdb.mpp.rpc.thrift.TUpdateTriggerLocationReq;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.trigger.api.enums.FailureStrategy;
import org.apache.iotdb.trigger.api.enums.TriggerEvent;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
index 2a38403979d..32dd078dd78 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionAgentLauncher.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.subscription.agent;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
@@ -33,6 +32,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import
org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaRespExceptionMessage;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
index d4b30e99940..d8cd497ff4b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java
@@ -19,10 +19,10 @@
package org.apache.iotdb.db.subscription.agent;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import
org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMetaKeeper;
import
org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaRespExceptionMessage;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
index d0e35211816..8dfaf86d812 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/receiver/SubscriptionReceiverV1.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.consensus.ConfigRegionId;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
@@ -39,6 +38,7 @@ import
org.apache.iotdb.db.subscription.timer.SubscriptionPollTimer;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.rpc.subscription.config.ConsumerConfig;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.rpc.subscription.payload.EnrichedTablets;
import org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCloseReq;
import
org.apache.iotdb.rpc.subscription.payload.request.PipeSubscribeCommitReq;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
index ab5040f04ff..ef4c7e578b1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/task/subtask/SubscriptionConnectorSubtaskManager.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.subscription.task.subtask;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant;
import
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
@@ -37,6 +36,7 @@ import
org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.exception.PipeException;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import java.util.Arrays;
import java.util.HashMap;
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
index 0b4fca31c18..c9534e4669b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.commons.subscription.meta.consumer;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
diff --git
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
index a950bf7a84c..d41b3706cd1 100644
---
a/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
+++
b/iotdb-core/node-commons/src/test/java/org/apache/iotdb/commons/subscription/consumer/ConsumerGroupDeSerTest.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.commons.subscription.consumer;
-import org.apache.iotdb.commons.exception.subscription.SubscriptionException;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerMeta;
+import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
import org.junit.Assert;
import org.junit.Test;