This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 1a33299e4f1 Subscription: add config to enable or disable subscription
(#15820) (#15822)
1a33299e4f1 is described below
commit 1a33299e4f13a0125f0c09e6af19d137ee9c94b2
Author: VGalaxies <[email protected]>
AuthorDate: Wed Jul 2 10:32:10 2025 +0800
Subscription: add config to enable or disable subscription (#15820) (#15822)
---
.../it/env/cluster/config/MppCommonConfig.java | 6 +++++
.../env/cluster/config/MppSharedCommonConfig.java | 7 +++++
.../it/env/remote/config/RemoteCommonConfig.java | 5 ++++
.../org/apache/iotdb/itbase/env/CommonConfig.java | 2 ++
.../it/cluster/IoTDBSubscriptionRestartIT.java | 4 +++
.../it/dual/AbstractSubscriptionDualIT.java | 4 +++
.../it/local/AbstractSubscriptionLocalIT.java | 2 ++
.../it/triple/AbstractSubscriptionTripleIT.java | 5 ++++
.../apache/iotdb/tools/it/ExportTsFileTestIT.java | 3 +++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../consumer/SubscriptionProvider.java | 1 +
.../task/execution/PipeSubtaskExecutorManager.java | 6 ++++-
.../impl/DataNodeInternalRPCServiceImpl.java | 26 ++++++++++++++++++
.../config/executor/ClusterConfigTaskExecutor.java | 31 ++++++++++++++++++++++
.../agent/SubscriptionReceiverAgent.java | 16 +++++++++--
.../agent/SubscriptionRuntimeAgent.java | 4 +++
.../apache/iotdb/commons/conf/CommonConfig.java | 10 +++++++
.../iotdb/commons/conf/CommonDescriptor.java | 5 ++++
.../commons/pipe/agent/task/PipeTaskAgent.java | 8 ++++++
.../subscription/config/SubscriptionConfig.java | 6 +++++
20 files changed, 149 insertions(+), 3 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index f1a820e2112..fe4190d7708 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -502,6 +502,12 @@ public class MppCommonConfig extends MppBaseConfig
implements CommonConfig {
return this;
}
+ @Override
+ public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
+ setProperty("subscription_enabled", String.valueOf(subscriptionEnabled));
+ return this;
+ }
+
@Override
public CommonConfig setDefaultStorageGroupLevel(int
defaultStorageGroupLevel) {
setProperty("default_storage_group_level",
String.valueOf(defaultStorageGroupLevel));
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index c81680c5230..c2ade6eace0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -513,6 +513,13 @@ public class MppSharedCommonConfig implements CommonConfig
{
return this;
}
+ @Override
+ public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
+ dnConfig.setSubscriptionEnabled(subscriptionEnabled);
+ cnConfig.setSubscriptionEnabled(subscriptionEnabled);
+ return this;
+ }
+
@Override
public CommonConfig setDefaultStorageGroupLevel(int
defaultStorageGroupLevel) {
dnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 3d57745ed68..581c5f4345e 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -360,4 +360,9 @@ public class RemoteCommonConfig implements CommonConfig {
public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
return this;
}
+
+ @Override
+ public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index dbba83aed58..16d50954209 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -160,6 +160,8 @@ public interface CommonConfig {
CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
+ CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled);
+
default CommonConfig setDefaultStorageGroupLevel(int
defaultStorageGroupLevel) {
return this;
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
index fee93bacdd5..de17dec1ed4 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
@@ -71,6 +71,10 @@ public class IoTDBSubscriptionRestartIT extends
AbstractSubscriptionIT {
public void setUp() throws Exception {
super.setUp();
+ // enable subscription
+
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
+
+ // set cluster env
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 27d8c3c493f..594f9efe691 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -47,6 +47,10 @@ public abstract class AbstractSubscriptionDualIT extends
AbstractSubscriptionIT
}
protected void setUpConfig() {
+ // enable subscription
+ senderEnv.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+ receiverEnv.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+
// enable auto create schema
senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
index 5fa7c5808fa..3f415051928 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
@@ -32,6 +32,8 @@ public abstract class AbstractSubscriptionLocalIT extends
AbstractSubscriptionIT
public void setUp() throws Exception {
super.setUp();
+ // enable subscription
+
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
EnvFactory.getEnv().initClusterEnvironment();
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
index 9622f727908..3b49eb8ed13 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
@@ -55,6 +55,11 @@ public abstract class AbstractSubscriptionTripleIT extends
AbstractSubscriptionI
}
protected void setUpConfig() {
+ // enable subscription
+ sender.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+ receiver1.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+ receiver2.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+
// enable auto create schema
sender.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
receiver1.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
index 57b76e55e81..6ad0c843e27 100644
---
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
@@ -52,7 +52,10 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
@BeforeClass
public static void setUp() throws Exception {
+ // enable subscription
+
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
EnvFactory.getEnv().initClusterEnvironment();
+
ip = EnvFactory.getEnv().getIP();
port = EnvFactory.getEnv().getPort();
toolsPath = EnvFactory.getEnv().getToolsPath();
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 28569ebd7f2..81f136fadd0 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -273,6 +273,7 @@ public enum TSStatusCode {
SUBSCRIPTION_MISSING_CUSTOMER(1909),
SHOW_SUBSCRIPTION_ERROR(1910),
SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911),
+ SUBSCRIPTION_NOT_ENABLED_ERROR(1912),
// Topic
CREATE_TOPIC_ERROR(2000),
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 8d50fda6070..03eefeaddba 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -413,6 +413,7 @@ final class SubscriptionProvider extends
SubscriptionSession {
case 1900: // SUBSCRIPTION_VERSION_ERROR
case 1901: // SUBSCRIPTION_TYPE_ERROR
case 1909: // SUBSCRIPTION_MISSING_CUSTOMER
+ case 1912: // SUBSCRIPTION_NOT_ENABLED_ERROR
default:
{
final String errorMessage =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
index eadb75463bb..f0de4d8d58e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.agent.task.execution;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.db.pipe.consensus.PipeConsensusSubtaskExecutor;
import
org.apache.iotdb.db.subscription.task.execution.SubscriptionSubtaskExecutor;
@@ -53,7 +54,10 @@ public class PipeSubtaskExecutorManager {
private PipeSubtaskExecutorManager() {
processorExecutor = new PipeProcessorSubtaskExecutor();
connectorExecutor = new PipeConnectorSubtaskExecutor();
- subscriptionExecutor = new SubscriptionSubtaskExecutor();
+ subscriptionExecutor =
+ SubscriptionConfig.getInstance().getSubscriptionEnabled()
+ ? new SubscriptionSubtaskExecutor()
+ : null;
consensusExecutor = new PipeConsensusSubtaskExecutor();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 7732814c109..9eb5446f0cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -1158,6 +1159,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TPushTopicMetaResp pushTopicMeta(TPushTopicMetaReq req) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return new TPushTopicMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
final List<TopicMeta> topicMetas = new ArrayList<>();
for (ByteBuffer byteBuffer : req.getTopicMetas()) {
topicMetas.add(TopicMeta.deserialize(byteBuffer));
@@ -1181,6 +1187,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TPushTopicMetaResp pushSingleTopicMeta(TPushSingleTopicMetaReq req) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return new TPushTopicMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
try {
final TPushTopicMetaRespExceptionMessage exceptionMessage;
if (req.isSetTopicNameToDrop()) {
@@ -1209,6 +1220,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TPushTopicMetaResp pushMultiTopicMeta(TPushMultiTopicMetaReq req) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return new TPushTopicMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
boolean hasException = false;
// If there is any exception, we use the size of exceptionMessages to
record the fail index
List<TPushTopicMetaRespExceptionMessage> exceptionMessages = new
ArrayList<>();
@@ -1256,6 +1272,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TPushConsumerGroupMetaResp
pushConsumerGroupMeta(TPushConsumerGroupMetaReq req) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return new TPushConsumerGroupMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
final List<ConsumerGroupMeta> consumerGroupMetas = new ArrayList<>();
for (ByteBuffer byteBuffer : req.getConsumerGroupMetas()) {
consumerGroupMetas.add(ConsumerGroupMeta.deserialize(byteBuffer));
@@ -1280,6 +1301,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
@Override
public TPushConsumerGroupMetaResp pushSingleConsumerGroupMeta(
TPushSingleConsumerGroupMetaReq req) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return new TPushConsumerGroupMetaResp()
+ .setStatus(new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+ }
+
try {
final TPushConsumerGroupMetaRespExceptionMessage exceptionMessage;
if (req.isSetConsumerGroupNameToDrop()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index abdecf6ba73..c0436818de1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -53,6 +53,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
import
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.commons.udf.service.UDFClassLoader;
@@ -311,6 +312,16 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
private ClusterConfigTaskExecutorHolder() {}
}
+ private static final SettableFuture<ConfigTaskResult>
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+
+ static {
+ SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create();
+ SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException(
+ new IoTDBException(
+ "Subscription not enabled, please set config
`subscription_enabled` to true.",
+ TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode()));
+ }
+
public static ClusterConfigTaskExecutor getInstance() {
return ClusterConfigTaskExecutor.ClusterConfigTaskExecutorHolder.INSTANCE;
}
@@ -2080,6 +2091,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> showSubscriptions(
ShowSubscriptionsStatement showSubscriptionsStatement) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+ }
+
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient configNodeClient =
@@ -2113,6 +2128,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> dropSubscription(
final DropSubscriptionStatement dropSubscriptionStatement) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+ }
+
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
@@ -2134,6 +2153,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement
createTopicStatement) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+ }
+
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
final String topicName = createTopicStatement.getTopicName();
@@ -2198,6 +2221,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> dropTopic(DropTopicStatement
dropTopicStatement) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+ }
+
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
@@ -2221,6 +2248,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
@Override
public SettableFuture<ConfigTaskResult> showTopics(ShowTopicsStatement
showTopicsStatement) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+ }
+
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
index 0eb32e21f5d..2e2f27b0a02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
@@ -42,17 +42,29 @@ public class SubscriptionReceiverAgent {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionReceiverAgent.class);
- private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new
ThreadLocal<>();
-
private static final Map<Byte, Supplier<SubscriptionReceiver>>
RECEIVER_CONSTRUCTORS =
new HashMap<>();
+ private static final TPipeSubscribeResp SUBSCRIPTION_NOT_ENABLED_ERROR_RESP =
+ new TPipeSubscribeResp(
+ RpcUtils.getStatus(
+ TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR,
+ "Subscription not enabled, please set config
`subscription_enabled` to true."),
+ PipeSubscribeResponseVersion.VERSION_1.getVersion(),
+ PipeSubscribeResponseType.ACK.getType());
+
+ private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new
ThreadLocal<>();
+
SubscriptionReceiverAgent() {
RECEIVER_CONSTRUCTORS.put(
PipeSubscribeRequestVersion.VERSION_1.getVersion(),
SubscriptionReceiverV1::new);
}
public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return SUBSCRIPTION_NOT_ENABLED_ERROR_RESP;
+ }
+
final byte reqVersion = req.getVersion();
if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
return getReceiver(reqVersion).handle(req);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
index 27c75b62dce..aec16568463 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
@@ -62,6 +62,10 @@ public class SubscriptionRuntimeAgent implements IService {
@Override
public void start() throws StartupException {
+ if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return;
+ }
+
SubscriptionConfig.getInstance().printAllConfigs();
SubscriptionAgentLauncher.launchSubscriptionTopicAgent();
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 59d348abe5c..038581bf1f5 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -326,6 +326,8 @@ public class CommonConfig {
private boolean pipeEventReferenceTrackingEnabled = true;
private long pipeEventReferenceEliminateIntervalSeconds = 10;
+ private boolean subscriptionEnabled = false;
+
private float subscriptionCacheMemoryUsagePercentage = 0.2F;
private int subscriptionSubtaskExecutorMaxThreadNum = 2;
@@ -2150,6 +2152,14 @@ public class CommonConfig {
pipeEventReferenceEliminateIntervalSeconds);
}
+ public boolean getSubscriptionEnabled() {
+ return subscriptionEnabled;
+ }
+
+ public void setSubscriptionEnabled(boolean subscriptionEnabled) {
+ this.subscriptionEnabled = subscriptionEnabled;
+ }
+
public float getSubscriptionCacheMemoryUsagePercentage() {
return subscriptionCacheMemoryUsagePercentage;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c17723cde3c..0432415637f 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -252,6 +252,11 @@ public class CommonDescriptor {
}
private void loadSubscriptionProps(TrimProperties properties) {
+ config.setSubscriptionEnabled(
+ Boolean.parseBoolean(
+ properties.getProperty(
+ "subscription_enabled",
String.valueOf(config.getSubscriptionEnabled()))));
+
config.setSubscriptionCacheMemoryUsagePercentage(
Float.parseFloat(
properties.getProperty(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index aeafccc2196..353da8c84e4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -33,6 +33,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent;
import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
@@ -165,6 +166,13 @@ public abstract class PipeTaskAgent {
private void executeSinglePipeMetaChanges(final PipeMeta metaFromCoordinator)
throws IllegalPathException {
final String pipeName = metaFromCoordinator.getStaticMeta().getPipeName();
+
+ // Do nothing with the subscription pipe if disable subscription
+ if (PipeStaticMeta.isSubscriptionPipe(pipeName)
+ && !SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+ return;
+ }
+
final PipeMeta metaInAgent = pipeMetaKeeper.getPipeMeta(pipeName);
// If pipe meta does not exist on local agent, create a new pipe
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index a332b87f0ed..c58ae725888 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -29,6 +29,10 @@ public class SubscriptionConfig {
private static final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+ public boolean getSubscriptionEnabled() {
+ return COMMON_CONFIG.getSubscriptionEnabled();
+ }
+
public float getSubscriptionCacheMemoryUsagePercentage() {
return COMMON_CONFIG.getSubscriptionCacheMemoryUsagePercentage();
}
@@ -138,6 +142,8 @@ public class SubscriptionConfig {
private static final Logger LOGGER =
LoggerFactory.getLogger(SubscriptionConfig.class);
public void printAllConfigs() {
+ LOGGER.info("SubscriptionEnabled: {}", getSubscriptionEnabled());
+
LOGGER.info(
"SubscriptionCacheMemoryUsagePercentage: {}",
getSubscriptionCacheMemoryUsagePercentage());
LOGGER.info(