This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch mergemaster0808 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3fd8c2eb95018d186c71c85e5494647ded1c6b00 Author: Zhenyu Luo <[email protected]> AuthorDate: Mon Aug 5 12:54:29 2024 +0800 PipePlugin/Subscription: The Drop PipePlugin operation adds a check to see if there is a Topic that uses PipePlugin as a processor (#13048) (cherry picked from commit 97f9ef529421c2eb2aec1ccee92e047622e1c0b4) --- .../persistence/subscription/SubscriptionInfo.java | 31 ++++++++++++++++++++++ .../impl/pipe/plugin/DropPipePluginProcedure.java | 6 +++++ 2 files changed, 37 insertions(+) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java index 6b64331422b..d1071c511af 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java @@ -207,6 +207,37 @@ public class SubscriptionInfo implements SnapshotProcessor { throw new SubscriptionException(exceptionMessage); } + public void validatePipePluginUsageByTopic(String pipePluginName) throws SubscriptionException { + acquireReadLock(); + try { + validatePipePluginUsageByTopicInternal(pipePluginName); + } finally { + releaseReadLock(); + } + } + + public void validatePipePluginUsageByTopicInternal(String pipePluginName) + throws SubscriptionException { + acquireReadLock(); + try { + topicMetaKeeper + .getAllTopicMeta() + .forEach( + meta -> { + if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) { + final String exceptionMessage = + String.format( + "PipePlugin '%s' is already used by Topic '%s' as a processor.", + pipePluginName, meta.getTopicName()); + LOGGER.warn(exceptionMessage); + throw new SubscriptionException(exceptionMessage); + } + }); + } finally { + releaseReadLock(); + } + } + public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws SubscriptionException { acquireReadLock(); try { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java index dc9d4ce4f87..9bb99f8f64c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java @@ -22,7 +22,9 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.plugin; import org.apache.iotdb.confignode.consensus.request.write.pipe.plugin.DropPipePluginPlan; import org.apache.iotdb.confignode.manager.pipe.coordinator.plugin.PipePluginCoordinator; import org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator; +import org.apache.iotdb.confignode.manager.subscription.SubscriptionCoordinator; import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo; +import org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo; import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv; import org.apache.iotdb.confignode.procedure.exception.ProcedureException; import org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException; @@ -119,9 +121,12 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi env.getConfigManager().getPipeManager().getPipeTaskCoordinator(); final PipePluginCoordinator pipePluginCoordinator = env.getConfigManager().getPipeManager().getPipePluginCoordinator(); + final SubscriptionCoordinator subscriptionCoordinator = + env.getConfigManager().getSubscriptionManager().getSubscriptionCoordinator(); final AtomicReference<PipeTaskInfo> pipeTaskInfo = pipeTaskCoordinator.lock(); pipePluginCoordinator.lock(); + SubscriptionInfo subscriptionInfo = subscriptionCoordinator.getSubscriptionInfo(); try { if (pipePluginCoordinator @@ -137,6 +142,7 @@ public class DropPipePluginProcedure extends AbstractNodeProcedure<DropPipePlugi } pipeTaskInfo.get().validatePipePluginUsageByPipe(pluginName); + subscriptionInfo.validatePipePluginUsageByTopic(pluginName); } catch (PipeException e) { // if the pipe plugin is a built-in plugin, we should not drop it LOGGER.warn(e.getMessage());
