This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 1a33299e4f1 Subscription: add config to enable or disable subscription 
(#15820) (#15822)
1a33299e4f1 is described below

commit 1a33299e4f13a0125f0c09e6af19d137ee9c94b2
Author: VGalaxies <[email protected]>
AuthorDate: Wed Jul 2 10:32:10 2025 +0800

    Subscription: add config to enable or disable subscription (#15820) (#15822)
---
 .../it/env/cluster/config/MppCommonConfig.java     |  6 +++++
 .../env/cluster/config/MppSharedCommonConfig.java  |  7 +++++
 .../it/env/remote/config/RemoteCommonConfig.java   |  5 ++++
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |  2 ++
 .../it/cluster/IoTDBSubscriptionRestartIT.java     |  4 +++
 .../it/dual/AbstractSubscriptionDualIT.java        |  4 +++
 .../it/local/AbstractSubscriptionLocalIT.java      |  2 ++
 .../it/triple/AbstractSubscriptionTripleIT.java    |  5 ++++
 .../apache/iotdb/tools/it/ExportTsFileTestIT.java  |  3 +++
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |  1 +
 .../consumer/SubscriptionProvider.java             |  1 +
 .../task/execution/PipeSubtaskExecutorManager.java |  6 ++++-
 .../impl/DataNodeInternalRPCServiceImpl.java       | 26 ++++++++++++++++++
 .../config/executor/ClusterConfigTaskExecutor.java | 31 ++++++++++++++++++++++
 .../agent/SubscriptionReceiverAgent.java           | 16 +++++++++--
 .../agent/SubscriptionRuntimeAgent.java            |  4 +++
 .../apache/iotdb/commons/conf/CommonConfig.java    | 10 +++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  5 ++++
 .../commons/pipe/agent/task/PipeTaskAgent.java     |  8 ++++++
 .../subscription/config/SubscriptionConfig.java    |  6 +++++
 20 files changed, 149 insertions(+), 3 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index f1a820e2112..fe4190d7708 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -502,6 +502,12 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
+    setProperty("subscription_enabled", String.valueOf(subscriptionEnabled));
+    return this;
+  }
+
   @Override
   public CommonConfig setDefaultStorageGroupLevel(int 
defaultStorageGroupLevel) {
     setProperty("default_storage_group_level", 
String.valueOf(defaultStorageGroupLevel));
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index c81680c5230..c2ade6eace0 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -513,6 +513,13 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     return this;
   }
 
+  @Override
+  public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
+    dnConfig.setSubscriptionEnabled(subscriptionEnabled);
+    cnConfig.setSubscriptionEnabled(subscriptionEnabled);
+    return this;
+  }
+
   @Override
   public CommonConfig setDefaultStorageGroupLevel(int 
defaultStorageGroupLevel) {
     dnConfig.setDefaultStorageGroupLevel(defaultStorageGroupLevel);
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 3d57745ed68..581c5f4345e 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -360,4 +360,9 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setQueryMemoryProportion(String queryMemoryProportion) {
     return this;
   }
+
+  @Override
+  public CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index dbba83aed58..16d50954209 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -160,6 +160,8 @@ public interface CommonConfig {
 
   CommonConfig setQueryMemoryProportion(String queryMemoryProportion);
 
+  CommonConfig setSubscriptionEnabled(boolean subscriptionEnabled);
+
   default CommonConfig setDefaultStorageGroupLevel(int 
defaultStorageGroupLevel) {
     return this;
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
index fee93bacdd5..de17dec1ed4 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/cluster/IoTDBSubscriptionRestartIT.java
@@ -71,6 +71,10 @@ public class IoTDBSubscriptionRestartIT extends 
AbstractSubscriptionIT {
   public void setUp() throws Exception {
     super.setUp();
 
+    // enable subscription
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
+
+    // set cluster env
     EnvFactory.getEnv()
         .getConfig()
         .getCommonConfig()
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
index 27d8c3c493f..594f9efe691 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/dual/AbstractSubscriptionDualIT.java
@@ -47,6 +47,10 @@ public abstract class AbstractSubscriptionDualIT extends 
AbstractSubscriptionIT
   }
 
   protected void setUpConfig() {
+    // enable subscription
+    senderEnv.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+    receiverEnv.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+
     // enable auto create schema
     senderEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
     receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
index 5fa7c5808fa..3f415051928 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/local/AbstractSubscriptionLocalIT.java
@@ -32,6 +32,8 @@ public abstract class AbstractSubscriptionLocalIT extends 
AbstractSubscriptionIT
   public void setUp() throws Exception {
     super.setUp();
 
+    // enable subscription
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
     EnvFactory.getEnv().initClusterEnvironment();
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
index 9622f727908..3b49eb8ed13 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/AbstractSubscriptionTripleIT.java
@@ -55,6 +55,11 @@ public abstract class AbstractSubscriptionTripleIT extends 
AbstractSubscriptionI
   }
 
   protected void setUpConfig() {
+    // enable subscription
+    sender.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+    receiver1.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+    receiver2.getConfig().getCommonConfig().setSubscriptionEnabled(true);
+
     // enable auto create schema
     sender.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
     receiver1.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(true);
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
index 57b76e55e81..6ad0c843e27 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/tools/it/ExportTsFileTestIT.java
@@ -52,7 +52,10 @@ public class ExportTsFileTestIT extends AbstractScriptIT {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    // enable subscription
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setSubscriptionEnabled(true);
     EnvFactory.getEnv().initClusterEnvironment();
+
     ip = EnvFactory.getEnv().getIP();
     port = EnvFactory.getEnv().getPort();
     toolsPath = EnvFactory.getEnv().getToolsPath();
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 28569ebd7f2..81f136fadd0 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -273,6 +273,7 @@ public enum TSStatusCode {
   SUBSCRIPTION_MISSING_CUSTOMER(1909),
   SHOW_SUBSCRIPTION_ERROR(1910),
   SUBSCRIPTION_PIPE_TIMEOUT_ERROR(1911),
+  SUBSCRIPTION_NOT_ENABLED_ERROR(1912),
 
   // Topic
   CREATE_TOPIC_ERROR(2000),
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
index 8d50fda6070..03eefeaddba 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionProvider.java
@@ -413,6 +413,7 @@ final class SubscriptionProvider extends 
SubscriptionSession {
       case 1900: // SUBSCRIPTION_VERSION_ERROR
       case 1901: // SUBSCRIPTION_TYPE_ERROR
       case 1909: // SUBSCRIPTION_MISSING_CUSTOMER
+      case 1912: // SUBSCRIPTION_NOT_ENABLED_ERROR
       default:
         {
           final String errorMessage =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
index eadb75463bb..f0de4d8d58e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/execution/PipeSubtaskExecutorManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.agent.task.execution;
 
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.db.pipe.consensus.PipeConsensusSubtaskExecutor;
 import 
org.apache.iotdb.db.subscription.task.execution.SubscriptionSubtaskExecutor;
 
@@ -53,7 +54,10 @@ public class PipeSubtaskExecutorManager {
   private PipeSubtaskExecutorManager() {
     processorExecutor = new PipeProcessorSubtaskExecutor();
     connectorExecutor = new PipeConnectorSubtaskExecutor();
-    subscriptionExecutor = new SubscriptionSubtaskExecutor();
+    subscriptionExecutor =
+        SubscriptionConfig.getInstance().getSubscriptionEnabled()
+            ? new SubscriptionSubtaskExecutor()
+            : null;
     consensusExecutor = new PipeConsensusSubtaskExecutor();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 7732814c109..9eb5446f0cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -60,6 +60,7 @@ import org.apache.iotdb.commons.schema.SchemaConstant;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.enums.Tag;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.subscription.meta.consumer.ConsumerGroupMeta;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
@@ -1158,6 +1159,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TPushTopicMetaResp pushTopicMeta(TPushTopicMetaReq req) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return new TPushTopicMetaResp()
+          .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    }
+
     final List<TopicMeta> topicMetas = new ArrayList<>();
     for (ByteBuffer byteBuffer : req.getTopicMetas()) {
       topicMetas.add(TopicMeta.deserialize(byteBuffer));
@@ -1181,6 +1187,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TPushTopicMetaResp pushSingleTopicMeta(TPushSingleTopicMetaReq req) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return new TPushTopicMetaResp()
+          .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    }
+
     try {
       final TPushTopicMetaRespExceptionMessage exceptionMessage;
       if (req.isSetTopicNameToDrop()) {
@@ -1209,6 +1220,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TPushTopicMetaResp pushMultiTopicMeta(TPushMultiTopicMetaReq req) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return new TPushTopicMetaResp()
+          .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    }
+
     boolean hasException = false;
     // If there is any exception, we use the size of exceptionMessages to 
record the fail index
     List<TPushTopicMetaRespExceptionMessage> exceptionMessages = new 
ArrayList<>();
@@ -1256,6 +1272,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
 
   @Override
   public TPushConsumerGroupMetaResp 
pushConsumerGroupMeta(TPushConsumerGroupMetaReq req) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return new TPushConsumerGroupMetaResp()
+          .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    }
+
     final List<ConsumerGroupMeta> consumerGroupMetas = new ArrayList<>();
     for (ByteBuffer byteBuffer : req.getConsumerGroupMetas()) {
       consumerGroupMetas.add(ConsumerGroupMeta.deserialize(byteBuffer));
@@ -1280,6 +1301,11 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   @Override
   public TPushConsumerGroupMetaResp pushSingleConsumerGroupMeta(
       TPushSingleConsumerGroupMetaReq req) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return new TPushConsumerGroupMetaResp()
+          .setStatus(new 
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+    }
+
     try {
       final TPushConsumerGroupMetaRespExceptionMessage exceptionMessage;
       if (req.isSetConsumerGroupNameToDrop()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index abdecf6ba73..c0436818de1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -53,6 +53,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta;
 import 
org.apache.iotdb.commons.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
 import org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.commons.subscription.meta.topic.TopicMeta;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.commons.udf.service.UDFClassLoader;
@@ -311,6 +312,16 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     private ClusterConfigTaskExecutorHolder() {}
   }
 
+  private static final SettableFuture<ConfigTaskResult> 
SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+
+  static {
+    SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE = SettableFuture.create();
+    SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE.setException(
+        new IoTDBException(
+            "Subscription not enabled, please set config 
`subscription_enabled` to true.",
+            TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR.getStatusCode()));
+  }
+
   public static ClusterConfigTaskExecutor getInstance() {
     return ClusterConfigTaskExecutor.ClusterConfigTaskExecutorHolder.INSTANCE;
   }
@@ -2080,6 +2091,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   @Override
   public SettableFuture<ConfigTaskResult> showSubscriptions(
       ShowSubscriptionsStatement showSubscriptionsStatement) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+    }
+
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
 
     try (final ConfigNodeClient configNodeClient =
@@ -2113,6 +2128,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
   public SettableFuture<ConfigTaskResult> dropSubscription(
       final DropSubscriptionStatement dropSubscriptionStatement) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+    }
+
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try (ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
@@ -2134,6 +2153,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
   @Override
   public SettableFuture<ConfigTaskResult> createTopic(CreateTopicStatement 
createTopicStatement) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+    }
+
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
 
     final String topicName = createTopicStatement.getTopicName();
@@ -2198,6 +2221,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
   @Override
   public SettableFuture<ConfigTaskResult> dropTopic(DropTopicStatement 
dropTopicStatement) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+    }
+
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try (ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
@@ -2221,6 +2248,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
 
   @Override
   public SettableFuture<ConfigTaskResult> showTopics(ShowTopicsStatement 
showTopicsStatement) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return SUBSCRIPTION_NOT_ENABLED_ERROR_FUTURE;
+    }
+
     final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
     try (final ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
index 0eb32e21f5d..2e2f27b0a02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionReceiverAgent.java
@@ -42,17 +42,29 @@ public class SubscriptionReceiverAgent {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionReceiverAgent.class);
 
-  private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new 
ThreadLocal<>();
-
   private static final Map<Byte, Supplier<SubscriptionReceiver>> 
RECEIVER_CONSTRUCTORS =
       new HashMap<>();
 
+  private static final TPipeSubscribeResp SUBSCRIPTION_NOT_ENABLED_ERROR_RESP =
+      new TPipeSubscribeResp(
+          RpcUtils.getStatus(
+              TSStatusCode.SUBSCRIPTION_NOT_ENABLED_ERROR,
+              "Subscription not enabled, please set config 
`subscription_enabled` to true."),
+          PipeSubscribeResponseVersion.VERSION_1.getVersion(),
+          PipeSubscribeResponseType.ACK.getType());
+
+  private final ThreadLocal<SubscriptionReceiver> receiverThreadLocal = new 
ThreadLocal<>();
+
   SubscriptionReceiverAgent() {
     RECEIVER_CONSTRUCTORS.put(
         PipeSubscribeRequestVersion.VERSION_1.getVersion(), 
SubscriptionReceiverV1::new);
   }
 
   public TPipeSubscribeResp handle(final TPipeSubscribeReq req) {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return SUBSCRIPTION_NOT_ENABLED_ERROR_RESP;
+    }
+
     final byte reqVersion = req.getVersion();
     if (RECEIVER_CONSTRUCTORS.containsKey(reqVersion)) {
       return getReceiver(reqVersion).handle(req);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
index 27c75b62dce..aec16568463 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionRuntimeAgent.java
@@ -62,6 +62,10 @@ public class SubscriptionRuntimeAgent implements IService {
 
   @Override
   public void start() throws StartupException {
+    if (!SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return;
+    }
+
     SubscriptionConfig.getInstance().printAllConfigs();
 
     SubscriptionAgentLauncher.launchSubscriptionTopicAgent();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index 59d348abe5c..038581bf1f5 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -326,6 +326,8 @@ public class CommonConfig {
   private boolean pipeEventReferenceTrackingEnabled = true;
   private long pipeEventReferenceEliminateIntervalSeconds = 10;
 
+  private boolean subscriptionEnabled = false;
+
   private float subscriptionCacheMemoryUsagePercentage = 0.2F;
   private int subscriptionSubtaskExecutorMaxThreadNum = 2;
 
@@ -2150,6 +2152,14 @@ public class CommonConfig {
         pipeEventReferenceEliminateIntervalSeconds);
   }
 
+  public boolean getSubscriptionEnabled() {
+    return subscriptionEnabled;
+  }
+
+  public void setSubscriptionEnabled(boolean subscriptionEnabled) {
+    this.subscriptionEnabled = subscriptionEnabled;
+  }
+
   public float getSubscriptionCacheMemoryUsagePercentage() {
     return subscriptionCacheMemoryUsagePercentage;
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c17723cde3c..0432415637f 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -252,6 +252,11 @@ public class CommonDescriptor {
   }
 
   private void loadSubscriptionProps(TrimProperties properties) {
+    config.setSubscriptionEnabled(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "subscription_enabled", 
String.valueOf(config.getSubscriptionEnabled()))));
+
     config.setSubscriptionCacheMemoryUsagePercentage(
         Float.parseFloat(
             properties.getProperty(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
index aeafccc2196..353da8c84e4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/PipeTaskAgent.java
@@ -33,6 +33,7 @@ import 
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTemporaryMetaInAgent;
 import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
 import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import org.apache.iotdb.commons.pipe.connector.limiter.PipeEndPointRateLimiter;
+import org.apache.iotdb.commons.subscription.config.SubscriptionConfig;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
 import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
 import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
@@ -165,6 +166,13 @@ public abstract class PipeTaskAgent {
   private void executeSinglePipeMetaChanges(final PipeMeta metaFromCoordinator)
       throws IllegalPathException {
     final String pipeName = metaFromCoordinator.getStaticMeta().getPipeName();
+
+    // Do nothing with the subscription pipe if disable subscription
+    if (PipeStaticMeta.isSubscriptionPipe(pipeName)
+        && !SubscriptionConfig.getInstance().getSubscriptionEnabled()) {
+      return;
+    }
+
     final PipeMeta metaInAgent = pipeMetaKeeper.getPipeMeta(pipeName);
 
     // If pipe meta does not exist on local agent, create a new pipe
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
index a332b87f0ed..c58ae725888 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/config/SubscriptionConfig.java
@@ -29,6 +29,10 @@ public class SubscriptionConfig {
 
   private static final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
 
+  public boolean getSubscriptionEnabled() {
+    return COMMON_CONFIG.getSubscriptionEnabled();
+  }
+
   public float getSubscriptionCacheMemoryUsagePercentage() {
     return COMMON_CONFIG.getSubscriptionCacheMemoryUsagePercentage();
   }
@@ -138,6 +142,8 @@ public class SubscriptionConfig {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(SubscriptionConfig.class);
 
   public void printAllConfigs() {
+    LOGGER.info("SubscriptionEnabled: {}", getSubscriptionEnabled());
+
     LOGGER.info(
         "SubscriptionCacheMemoryUsagePercentage: {}", 
getSubscriptionCacheMemoryUsagePercentage());
     LOGGER.info(

Reply via email to