This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 43e562a2ac1 Pipe/Subscription: Add 'Create If Not Exists' and 'Drop If
Exists' Support for Pipes, Plugins, and Topics (#12969)
43e562a2ac1 is described below
commit 43e562a2ac1f032d3d3dd9de3d8acd95742e79b6
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Aug 1 14:33:44 2024 +0800
Pipe/Subscription: Add 'Create If Not Exists' and 'Drop If Exists' Support
for Pipes, Plugins, and Topics (#12969)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../IoTDBPipeConditionalOperationsIT.java | 228 +++++++++++++++++++++
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 14 +-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 9 +
.../iotdb/confignode/manager/ConfigManager.java | 15 +-
.../apache/iotdb/confignode/manager/IManager.java | 11 +-
.../iotdb/confignode/manager/ProcedureManager.java | 13 +-
.../coordinator/plugin/PipePluginCoordinator.java | 12 +-
.../pipe/coordinator/task/PipeTaskCoordinator.java | 11 +-
.../subscription/SubscriptionCoordinator.java | 18 +-
.../persistence/pipe/PipePluginInfo.java | 26 ++-
.../confignode/persistence/pipe/PipeTaskInfo.java | 26 ++-
.../persistence/subscription/SubscriptionInfo.java | 12 +-
.../impl/pipe/AbstractOperatePipeProcedureV2.java | 6 +-
.../pipe/plugin/CreatePipePluginProcedure.java | 28 ++-
.../impl/pipe/plugin/DropPipePluginProcedure.java | 22 +-
.../runtime/PipeHandleLeaderChangeProcedure.java | 2 +-
.../runtime/PipeHandleMetaChangeProcedure.java | 2 +-
.../impl/pipe/runtime/PipeMetaSyncProcedure.java | 2 +-
.../impl/pipe/task/AlterPipeProcedureV2.java | 6 +-
.../impl/pipe/task/CreatePipeProcedureV2.java | 15 +-
.../impl/pipe/task/DropPipeProcedureV2.java | 2 +-
.../impl/pipe/task/StartPipeProcedureV2.java | 4 +-
.../impl/pipe/task/StopPipeProcedureV2.java | 2 +-
.../AbstractOperateSubscriptionProcedure.java | 15 +-
.../consumer/AlterConsumerGroupProcedure.java | 3 +-
.../runtime/ConsumerGroupMetaSyncProcedure.java | 3 +-
.../subscription/CreateSubscriptionProcedure.java | 3 +-
.../subscription/DropSubscriptionProcedure.java | 3 +-
.../subscription/topic/AlterTopicProcedure.java | 4 +-
.../subscription/topic/CreateTopicProcedure.java | 7 +-
.../subscription/topic/DropTopicProcedure.java | 3 +-
.../topic/runtime/TopicMetaSyncProcedure.java | 3 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 20 +-
.../iotdb/confignode/persistence/PipeInfoTest.java | 2 +-
.../pipe/plugin/CreatePipePluginProcedureTest.java | 2 +-
.../pipe/plugin/DropPipePluginProcedureTest.java | 2 +-
.../iotdb/db/protocol/client/ConfigNodeClient.java | 14 ++
.../config/executor/ClusterConfigTaskExecutor.java | 37 +++-
.../config/executor/IConfigTaskExecutor.java | 3 +-
.../config/metadata/DropPipePluginTask.java | 6 +-
.../db/queryengine/plan/parser/ASTVisitor.java | 20 +-
.../metadata/pipe/AlterPipeStatement.java | 9 +
.../metadata/pipe/CreatePipePluginStatement.java | 9 +-
.../metadata/pipe/CreatePipeStatement.java | 9 +
.../metadata/pipe/DropPipePluginStatement.java | 18 +-
.../statement/metadata/pipe/DropPipeStatement.java | 9 +
.../subscription/CreateTopicStatement.java | 10 +-
.../metadata/subscription/DropTopicStatement.java | 9 +
.../src/main/thrift/confignode.thrift | 21 ++
50 files changed, 632 insertions(+), 99 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java
new file mode 100644
index 00000000000..e6dfd37a307
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeConditionalOperationsIT.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.autocreate;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2AutoCreateSchema.class})
+public class IoTDBPipeConditionalOperationsIT extends AbstractPipeDualAutoIT {
+
+ @Test
+ public void testBasicCreatePipeIfNotExists() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create pipe
+ String sql =
+ String.format(
+ "create pipe If Not Exists a2b with source
('source'='iotdb-source', 'source.pattern'='root.test1',
'source.realtime.mode'='stream') with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ long creationTime;
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // Check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // Record last creation time
+ creationTime = showPipeResult.get(0).creationTime;
+ }
+
+ // Create pipe If Not Exists
+ sql =
+ String.format(
+ "create pipe If Not Exists a2b with source
('source'='iotdb-source', 'source.path'='root.test2.**') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // Check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test2.**"));
+ Assert.assertEquals(creationTime, showPipeResult.get(0).creationTime);
+ }
+ }
+
+ @Test
+ public void testBasicDropPipeIfExists() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Create pipe
+ final String sql =
+ String.format(
+ "create pipe If Not Exists a2b with source
('source'='iotdb-source', 'source.path'='root.test1.**') with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // Drop pipe If Exists
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("Drop pipe If Exists a2b");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(0, showPipeResult.size());
+ }
+
+ // Drop pipe If Exists
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("Drop pipe If Exists a2b");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void testBasicAlterPipeIfExists() throws Exception {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Alter pipe If Exists
+ String sql =
+ String.format(
+ "Alter pipe If Exists a2b replace sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(0, showPipeResult.size());
+ }
+
+ // Create pipe
+ sql =
+ String.format(
+ "create pipe If Not Exists a2b with source
('source'='iotdb-source', 'source.pattern'='root.test1',
'source.realtime.mode'='stream') with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // Alter pipe If Exists
+ sql =
+ String.format(
+ "Alter pipe If Exists a2b replace source () replace processor ()
replace sink ('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // Check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // Check configurations
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+ Assert.assertFalse(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+ Assert.assertFalse(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ }
+ }
+}
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 20fba208d4f..80e31e2469d 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -250,6 +250,7 @@ public enum TSStatusCode {
ALTER_TOPIC_ERROR(2002),
SHOW_TOPIC_ERROR(2003),
TOPIC_PUSH_META_ERROR(2004),
+ TOPIC_NOT_EXIST_ERROR(2005),
// Consumer
CREATE_CONSUMER_ERROR(2100),
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index e66c875c9a7..8986fce18e1 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -535,7 +535,7 @@ verifyConnection
// Pipe Task
=========================================================================================
createPipe
- : CREATE PIPE pipeName=identifier
+ : CREATE PIPE (IF NOT EXISTS)? pipeName=identifier
extractorAttributesClause?
processorAttributesClause?
connectorAttributesClause
@@ -575,7 +575,7 @@ connectorAttributeClause
;
alterPipe
- : ALTER PIPE pipeName=identifier
+ : ALTER PIPE (IF EXISTS)? pipeName=identifier
alterExtractorAttributesClause?
alterProcessorAttributesClause?
alterConnectorAttributesClause?
@@ -603,7 +603,7 @@ alterConnectorAttributesClause
;
dropPipe
- : DROP PIPE pipeName=identifier
+ : DROP PIPE (IF EXISTS)? pipeName=identifier
;
startPipe
@@ -620,11 +620,11 @@ showPipes
// Pipe Plugin
=========================================================================================
createPipePlugin
- : CREATE PIPEPLUGIN pluginName=identifier AS className=STRING_LITERAL
uriClause
+ : CREATE PIPEPLUGIN (IF NOT EXISTS)? pluginName=identifier AS
className=STRING_LITERAL uriClause
;
dropPipePlugin
- : DROP PIPEPLUGIN pluginName=identifier
+ : DROP PIPEPLUGIN (IF EXISTS)? pluginName=identifier
;
showPipePlugins
@@ -633,7 +633,7 @@ showPipePlugins
// Topic
=========================================================================================
createTopic
- : CREATE TOPIC topicName=identifier topicAttributesClause?
+ : CREATE TOPIC (IF NOT EXISTS)? topicName=identifier topicAttributesClause?
;
topicAttributesClause
@@ -645,7 +645,7 @@ topicAttributeClause
;
dropTopic
- : DROP TOPIC topicName=identifier
+ : DROP TOPIC (IF EXISTS)? topicName=identifier
;
showTopics
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index d1865a1e204..dc9c12c7c8c 100644
--- a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -286,6 +286,10 @@ EVERY
: E V E R Y
;
+EXISTS
+ : E X I S T S
+ ;
+
EXPLAIN
: E X P L A I N
;
@@ -942,10 +946,15 @@ ELSE
: E L S E
;
+IF
+ : I F
+ ;
+
INF
: I N F
;
+
// Privileges Keywords
PRIVILEGE_VALUE
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 0f8271d27de..59dacc5ad5c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -146,6 +146,9 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -1441,10 +1444,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus dropPipePlugin(String pipePluginName) {
+ public TSStatus dropPipePlugin(TDropPipePluginReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? pipeManager.getPipePluginCoordinator().dropPipePlugin(pipePluginName)
+ ? pipeManager.getPipePluginCoordinator().dropPipePlugin(req)
: status;
}
@@ -2026,10 +2029,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus dropPipe(String pipeName) {
+ public TSStatus dropPipe(TDropPipeReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? pipeManager.getPipeTaskCoordinator().dropPipe(pipeName)
+ ? pipeManager.getPipeTaskCoordinator().dropPipe(req)
: status;
}
@@ -2058,10 +2061,10 @@ public class ConfigManager implements IManager {
}
@Override
- public TSStatus dropTopic(String topicName) {
+ public TSStatus dropTopic(TDropTopicReq req) {
TSStatus status = confirmLeader();
return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
- ? subscriptionManager.getSubscriptionCoordinator().dropTopic(topicName)
+ ? subscriptionManager.getSubscriptionCoordinator().dropTopic(req)
: status;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 3df54d41cf7..366be81c9a3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -77,6 +77,9 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteDatabasesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -464,7 +467,7 @@ public interface IManager {
TSStatus createPipePlugin(TCreatePipePluginReq req);
/** Drop pipe plugin. */
- TSStatus dropPipePlugin(String pluginName);
+ TSStatus dropPipePlugin(TDropPipePluginReq req);
/** Show pipe plugins. */
TGetPipePluginTableResp getPipePluginTable();
@@ -631,11 +634,11 @@ public interface IManager {
/**
* Drop Pipe.
*
- * @param pipeName name of Pipe
+ * @param req Info about Pipe
* @return {@link TSStatusCode#SUCCESS_STATUS} if dropped the pipe
successfully, {@link
* TSStatusCode#PIPE_ERROR} if encountered failure.
*/
- TSStatus dropPipe(String pipeName);
+ TSStatus dropPipe(TDropPipeReq req);
/**
* Get Pipe by name. If pipeName is empty, get all Pipe.
@@ -670,7 +673,7 @@ public interface IManager {
TSStatus createTopic(TCreateTopicReq topic);
/** Drop Topic. */
- TSStatus dropTopic(String topicName);
+ TSStatus dropTopic(TDropTopicReq req);
/** Show Topic. */
TShowTopicResp showTopic(TShowTopicReq req);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 2a14d9445ad..c77af983757 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -106,6 +106,7 @@ import
org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
@@ -799,9 +800,10 @@ public class ProcedureManager {
return statusList.get(0);
}
- public TSStatus createPipePlugin(PipePluginMeta pipePluginMeta, byte[]
jarFile) {
+ public TSStatus createPipePlugin(
+ PipePluginMeta pipePluginMeta, byte[] jarFile, boolean
isSetIfNotExistsCondition) {
final CreatePipePluginProcedure createPipePluginProcedure =
- new CreatePipePluginProcedure(pipePluginMeta, jarFile);
+ new CreatePipePluginProcedure(pipePluginMeta, jarFile,
isSetIfNotExistsCondition);
try {
if (jarFile != null
&& new
UpdateProcedurePlan(createPipePluginProcedure).getSerializedSize()
@@ -829,8 +831,11 @@ public class ProcedureManager {
}
}
- public TSStatus dropPipePlugin(String pluginName) {
- final long procedureId = executor.submitProcedure(new
DropPipePluginProcedure(pluginName));
+ public TSStatus dropPipePlugin(TDropPipePluginReq req) {
+ final long procedureId =
+ executor.submitProcedure(
+ new DropPipePluginProcedure(
+ req.getPluginName(), req.isSetIfExistsCondition() &&
req.isIfExistsCondition()));
final List<TSStatus> statusList = new ArrayList<>();
final boolean isSucceed =
waitingProcedureFinished(Collections.singletonList(procedureId),
statusList);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
index 5c0ad6492e6..07259f842fb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/plugin/PipePluginCoordinator.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.confignode.consensus.response.pipe.plugin.PipePluginTabl
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipePluginInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetJarInListResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
@@ -72,11 +73,16 @@ public class PipePluginCoordinator {
final PipePluginMeta pipePluginMeta =
new PipePluginMeta(pluginName, className, false, jarName, jarMD5);
- return
configManager.getProcedureManager().createPipePlugin(pipePluginMeta,
req.getJarFile());
+ return configManager
+ .getProcedureManager()
+ .createPipePlugin(
+ pipePluginMeta,
+ req.getJarFile(),
+ req.isSetIfNotExistsCondition() && req.isIfNotExistsCondition());
}
- public TSStatus dropPipePlugin(String pluginName) {
- return configManager.getProcedureManager().dropPipePlugin(pluginName);
+ public TSStatus dropPipePlugin(TDropPipePluginReq req) {
+ return configManager.getProcedureManager().dropPipePlugin(req);
}
public TGetPipePluginTableResp getPipePluginTable() {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
index b07203e43ed..100da334d35 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/task/PipeTaskCoordinator.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.pipe.PipeTaskInfo;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeResp;
@@ -160,13 +161,19 @@ public class PipeTaskCoordinator {
}
/** Caller should ensure that the method is called in the lock {@link
#lock()}. */
- public TSStatus dropPipe(String pipeName) {
+ public TSStatus dropPipe(TDropPipeReq req) {
+ final String pipeName = req.getPipeName();
final boolean isPipeExistedBeforeDrop =
pipeTaskInfo.isPipeExisted(pipeName);
final TSStatus status =
configManager.getProcedureManager().dropPipe(pipeName);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to drop pipe {}. Result status: {}.", pipeName,
status);
}
- return isPipeExistedBeforeDrop
+
+ final boolean isSetIfExistsCondition =
+ req.isSetIfExistsCondition() && req.isIfExistsCondition();
+ // If the `IF EXISTS` condition is not set and the pipe does not exist
before the delete
+ // operation, return an error status indicating that the pipe does not
exist.
+ return isPipeExistedBeforeDrop || isSetIfExistsCondition
? status
: RpcUtils.getStatus(
TSStatusCode.PIPE_NOT_EXIST_ERROR,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
index c48f3e0f078..20dfce44bf2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/subscription/SubscriptionCoordinator.java
@@ -30,6 +30,7 @@ import
org.apache.iotdb.confignode.persistence.subscription.SubscriptionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TCloseConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateConsumerReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTopicReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllTopicInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq;
@@ -38,6 +39,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowTopicResp;
import org.apache.iotdb.confignode.rpc.thrift.TSubscribeReq;
import org.apache.iotdb.confignode.rpc.thrift.TUnsubscribeReq;
+import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
@@ -149,12 +151,24 @@ public class SubscriptionCoordinator {
return status;
}
- public TSStatus dropTopic(String topicName) {
+ public TSStatus dropTopic(TDropTopicReq req) {
+ final String topicName = req.getTopicName();
+ final boolean isTopicExistedBeforeDrop =
subscriptionInfo.isTopicExisted(topicName);
final TSStatus status =
configManager.getProcedureManager().dropTopic(topicName);
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
LOGGER.warn("Failed to drop topic {}. Result status: {}.", topicName,
status);
}
- return status;
+
+ // If the `IF EXISTS` condition is not set and the topic does not exist
before the drop
+ // operation, return an error status indicating that the topic does not
exist.
+ final boolean isIfExistedConditionSet =
+ req.isSetIfExistsCondition() && req.isIfExistsCondition();
+ return isTopicExistedBeforeDrop || isIfExistedConditionSet
+ ? status
+ : RpcUtils.getStatus(
+ TSStatusCode.TOPIC_NOT_EXIST_ERROR,
+ String.format(
+ "Failed to drop topic %s. Failures: %s does not exist.",
topicName, topicName));
}
public TShowTopicResp showTopic(TShowTopicReq req) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
index 1e4a81cfe44..64d235e7f23 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipePluginInfo.java
@@ -93,19 +93,38 @@ public class PipePluginInfo implements SnapshotProcessor {
/////////////////////////////// Validator ///////////////////////////////
- public void validateBeforeCreatingPipePlugin(
- final String pluginName, final String jarName, final String jarMD5) {
+ /**
+ * @return true if the pipe plugin is already created and the
isSetIfNotExistsCondition is true,
+ * false otherwise
+ * @throws PipeException if the pipe plugin is already created and the
isSetIfNotExistsCondition
+ * is false
+ */
+ public boolean validateBeforeCreatingPipePlugin(
+ final String pluginName, final boolean isSetIfNotExistsCondition) {
// both build-in and user defined pipe plugin should be unique
if (pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+ if (isSetIfNotExistsCondition) {
+ return true;
+ }
throw new PipeException(
String.format(
"Failed to create PipePlugin [%s], the same name PipePlugin has
been created",
pluginName));
}
+ return false;
}
- public void validateBeforeDroppingPipePlugin(final String pluginName) {
+ /**
+ * @return true if the pipe plugin is not created and the
isSetIfExistsCondition is true, false
+ * otherwise
+ * @throws PipeException if the pipe plugin is not created and the
isSetIfExistsCondition is false
+ */
+ public boolean validateBeforeDroppingPipePlugin(
+ final String pluginName, final boolean isSetIfExistsCondition) {
if (!pipePluginMetaKeeper.containsPipePlugin(pluginName)) {
+ if (isSetIfExistsCondition) {
+ return true;
+ }
throw new PipeException(
String.format(
"Failed to drop PipePlugin [%s], this PipePlugin has not been
created", pluginName));
@@ -116,6 +135,7 @@ public class PipePluginInfo implements SnapshotProcessor {
"Failed to drop PipePlugin [%s], the PipePlugin is a built-in
PipePlugin",
pluginName));
}
+ return false;
}
public boolean isJarNeededToBeSavedWhenCreatingPipePlugin(final String
jarName) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index c5a264c047a..ddc6ae6ac60 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -155,19 +155,25 @@ public class PipeTaskInfo implements SnapshotProcessor {
/////////////////////////////// Validator ///////////////////////////////
- public void checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest)
throws PipeException {
+ public boolean checkBeforeCreatePipe(final TCreatePipeReq createPipeRequest)
+ throws PipeException {
acquireReadLock();
try {
- checkBeforeCreatePipeInternal(createPipeRequest);
+ return checkBeforeCreatePipeInternal(createPipeRequest);
} finally {
releaseReadLock();
}
}
- private void checkBeforeCreatePipeInternal(final TCreatePipeReq
createPipeRequest)
+ private boolean checkBeforeCreatePipeInternal(final TCreatePipeReq
createPipeRequest)
throws PipeException {
if (!isPipeExisted(createPipeRequest.getPipeName())) {
- return;
+ return true;
+ }
+
+ if (createPipeRequest.isSetIfNotExistsCondition()
+ && createPipeRequest.isIfNotExistsCondition()) {
+ return false;
}
final String exceptionMessage =
@@ -178,19 +184,23 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- public void checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq
alterPipeRequest)
+ public boolean checkAndUpdateRequestBeforeAlterPipe(final TAlterPipeReq
alterPipeRequest)
throws PipeException {
acquireReadLock();
try {
- checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
+ return checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
} finally {
releaseReadLock();
}
}
- private void checkAndUpdateRequestBeforeAlterPipeInternal(final
TAlterPipeReq alterPipeRequest)
+ private boolean checkAndUpdateRequestBeforeAlterPipeInternal(final
TAlterPipeReq alterPipeRequest)
throws PipeException {
if (!isPipeExisted(alterPipeRequest.getPipeName())) {
+ if (alterPipeRequest.isSetIfExistsCondition() &&
alterPipeRequest.isIfExistsCondition()) {
+ return false;
+ }
+
final String exceptionMessage =
String.format(
"Failed to alter pipe %s, the pipe does not exist",
alterPipeRequest.getPipeName());
@@ -254,6 +264,8 @@ public class PipeTaskInfo implements SnapshotProcessor {
.getAttribute());
}
}
+
+ return true;
}
public void checkBeforeStartPipe(final String pipeName) throws PipeException
{
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
index b020ffc045d..6b64331422b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java
@@ -142,20 +142,24 @@ public class SubscriptionInfo implements
SnapshotProcessor {
/////////////////////////////// Topic ///////////////////////////////
- public void validateBeforeCreatingTopic(TCreateTopicReq createTopicReq)
+ public boolean validateBeforeCreatingTopic(TCreateTopicReq createTopicReq)
throws SubscriptionException {
acquireReadLock();
try {
- checkBeforeCreateTopicInternal(createTopicReq);
+ return checkBeforeCreateTopicInternal(createTopicReq);
} finally {
releaseReadLock();
}
}
- private void checkBeforeCreateTopicInternal(TCreateTopicReq createTopicReq)
+ private boolean checkBeforeCreateTopicInternal(TCreateTopicReq
createTopicReq)
throws SubscriptionException {
if (!isTopicExisted(createTopicReq.getTopicName())) {
- return;
+ return true;
+ }
+
+ if (createTopicReq.isSetIfNotExistsCondition() &&
createTopicReq.isIfNotExistsCondition()) {
+ return false;
}
final String exceptionMessage =
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
index 8decdd0b4da..325832a3869 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/AbstractOperatePipeProcedureV2.java
@@ -185,7 +185,7 @@ public abstract class AbstractOperatePipeProcedureV2
/**
* Execute at state {@link OperatePipeTaskState#VALIDATE_TASK}.
*
- * @return true if this procedure can skip subsequent stages (start RUNNING
pipe or stop STOPPED
+ * @return false if this procedure can skip subsequent stages (start RUNNING
pipe or stop STOPPED
* pipe without runtime exception)
* @throws PipeException if validation for pipe parameters failed
*/
@@ -224,8 +224,8 @@ public abstract class AbstractOperatePipeProcedureV2
try {
switch (state) {
case VALIDATE_TASK:
- if (executeFromValidateTask(env)) {
- LOGGER.warn("ProcedureId {}: {}", getProcId(),
SKIP_PIPE_PROCEDURE_MESSAGE);
+ if (!executeFromValidateTask(env)) {
+ LOGGER.info("ProcedureId {}: {}", getProcId(),
SKIP_PIPE_PROCEDURE_MESSAGE);
// On client side, the message returned after the successful
execution of the pipe
// command corresponding to this procedure is "Msg: The statement
is executed
// successfully."
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
index 53fc302c663..fd485079890 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedure.java
@@ -66,14 +66,21 @@ public class CreatePipePluginProcedure extends
AbstractNodeProcedure<CreatePipeP
private PipePluginMeta pipePluginMeta;
private byte[] jarFile;
+ // This field will not be serialized. It may cause some problems
+ // when the procedure fails on one node and recovers on another node.
+ // Though it is not a good practice, it is acceptable here.
+ private boolean isSetIfNotExistsCondition;
+
public CreatePipePluginProcedure() {
super();
}
- public CreatePipePluginProcedure(PipePluginMeta pipePluginMeta, byte[]
jarFile) {
+ public CreatePipePluginProcedure(
+ PipePluginMeta pipePluginMeta, byte[] jarFile, boolean
isSetIfNotExistsCondition) {
super();
this.pipePluginMeta = pipePluginMeta;
this.jarFile = jarFile;
+ this.isSetIfNotExistsCondition = isSetIfNotExistsCondition;
}
@Override
@@ -125,20 +132,25 @@ public class CreatePipePluginProcedure extends
AbstractNodeProcedure<CreatePipeP
env.getConfigManager().getPipeManager().getPipePluginCoordinator();
pipePluginCoordinator.lock();
+ final String pluginName = pipePluginMeta.getPluginName();
try {
- pipePluginCoordinator
+ if (pipePluginCoordinator
.getPipePluginInfo()
- .validateBeforeCreatingPipePlugin(
- pipePluginMeta.getPluginName(),
- pipePluginMeta.getJarName(),
- pipePluginMeta.getJarMD5());
+ .validateBeforeCreatingPipePlugin(pluginName,
isSetIfNotExistsCondition)) {
+ LOGGER.info(
+ "Pipe plugin {} is already created and isSetIfNotExistsCondition
is true, end the CreatePipePluginProcedure({})",
+ pluginName,
+ pluginName);
+ pipePluginCoordinator.unlock();
+ return Flow.NO_MORE_STATE;
+ }
} catch (PipeException e) {
// The pipe plugin has already created, we should end the procedure
LOGGER.warn(
"Pipe plugin {} is already created, end the
CreatePipePluginProcedure({})",
- pipePluginMeta.getPluginName(),
- pipePluginMeta.getPluginName());
+ pluginName,
+ pluginName);
setFailure(new ProcedureException(e.getMessage()));
pipePluginCoordinator.unlock();
return Flow.NO_MORE_STATE;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
index 2502a56ab47..dc9d4ce4f87 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedure.java
@@ -62,13 +62,20 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
private String pluginName;
+ // If the plugin does not exist and the If Exists condition is met, the
process ends.
+ // This field will not be serialized. It may cause some problems
+ // when the procedure fails on one node and recovers on another node.
+ // Though it is not a good practice, it is acceptable here.
+ private boolean isSetIfExistsCondition;
+
public DropPipePluginProcedure() {
super();
}
- public DropPipePluginProcedure(String pluginName) {
+ public DropPipePluginProcedure(String pluginName, boolean
isSetIfExistsCondition) {
super();
this.pluginName = pluginName;
+ this.isSetIfExistsCondition = isSetIfExistsCondition;
}
@Override
@@ -117,7 +124,18 @@ public class DropPipePluginProcedure extends
AbstractNodeProcedure<DropPipePlugi
pipePluginCoordinator.lock();
try {
-
pipePluginCoordinator.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+ if (pipePluginCoordinator
+ .getPipePluginInfo()
+ .validateBeforeDroppingPipePlugin(pluginName,
isSetIfExistsCondition)) {
+ LOGGER.info(
+ "Pipe plugin {} is not exist, end the DropPipePluginProcedure({})",
+ pluginName,
+ pluginName);
+ pipePluginCoordinator.unlock();
+ pipeTaskCoordinator.unlock();
+ return Flow.NO_MORE_STATE;
+ }
+
pipeTaskInfo.get().validatePipePluginUsageByPipe(pluginName);
} catch (PipeException e) {
// if the pipe plugin is a built-in plugin, we should not drop it
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
index 0803aa29ca8..f18737e8b7a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleLeaderChangeProcedure.java
@@ -71,7 +71,7 @@ public class PipeHandleLeaderChangeProcedure extends
AbstractOperatePipeProcedur
LOGGER.info("PipeHandleLeaderChangeProcedure: executeFromValidateTask");
// Nothing needs to be checked
- return false;
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
index 0919894f0d5..e91a6ba974d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeHandleMetaChangeProcedure.java
@@ -81,7 +81,7 @@ public class PipeHandleMetaChangeProcedure extends
AbstractOperatePipeProcedureV
LOGGER.info("PipeHandleMetaChangeProcedure: executeFromValidateTask");
// Do nothing
- return false;
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
index 8b1991aeb88..c4907d1f9e7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/runtime/PipeMetaSyncProcedure.java
@@ -94,7 +94,7 @@ public class PipeMetaSyncProcedure extends
AbstractOperatePipeProcedureV2 {
LOGGER.info("PipeMetaSyncProcedure: executeFromValidateTask");
LAST_EXECUTION_TIME.set(System.currentTimeMillis());
- return false;
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index bfd35a3bc96..59a04f9a806 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -96,7 +96,9 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
// We should execute checkBeforeAlterPipe before checking the pipe plugin.
This method will
// update the alterPipeRequest based on the alterPipeRequest and existing
pipe metadata.
- pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
+ if
(!pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest)) {
+ return false;
+ }
final PipeManager pipeManager = env.getConfigManager().getPipeManager();
pipeManager
@@ -107,7 +109,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
alterPipeRequest.getProcessorAttributes(),
alterPipeRequest.getConnectorAttributes());
- return false;
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
index 68075b5911d..cd581d01332 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/CreatePipeProcedureV2.java
@@ -42,6 +42,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.PipeTaskOperation;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.pipe.api.PipePlugin;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -110,6 +111,17 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
return PipeTaskOperation.CREATE_PIPE;
}
+ /**
+ * Check the {@link PipePlugin} configuration in Pipe. If there is an error,
throw {@link
+ * PipeException}. If there is a Pipe with the same name and there is no
IfNotExists condition in
+ * {@link #createPipeRequest}, throw {@link PipeException}. If there is an
IfNotExists condition,
+ * return {@code false}. If there is no Pipe with the same name, return
{@code true}.
+ *
+ * @param env the environment for the procedure
+ * @return {@code true} The pipeline does not exist {@code false} The
pipeline already exists and
+ * satisfies the IfNotExists condition
+ * @throws PipeException
+ */
@Override
public boolean executeFromValidateTask(ConfigNodeProcedureEnv env) throws
PipeException {
LOGGER.info(
@@ -123,9 +135,8 @@ public class CreatePipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
createPipeRequest.getExtractorAttributes(),
createPipeRequest.getProcessorAttributes(),
createPipeRequest.getConnectorAttributes());
- pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
- return false;
+ return pipeTaskInfo.get().checkBeforeCreatePipe(createPipeRequest);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
index 60d851a66e3..0c7042caf3f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/DropPipeProcedureV2.java
@@ -78,7 +78,7 @@ public class DropPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
pipeTaskInfo.get().checkBeforeDropPipe(pipeName);
- return false;
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
index 628935a2116..58254dc1ab0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StartPipeProcedureV2.java
@@ -65,8 +65,8 @@ public class StartPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
pipeTaskInfo.get().checkBeforeStartPipe(pipeName);
- return pipeTaskInfo.get().isPipeRunning(pipeName)
- && !pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
+ return !pipeTaskInfo.get().isPipeRunning(pipeName)
+ || pipeTaskInfo.get().isStoppedByRuntimeException(pipeName);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
index 81a70733363..a817e16c7ae 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/StopPipeProcedureV2.java
@@ -65,7 +65,7 @@ public class StopPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
pipeTaskInfo.get().checkBeforeStopPipe(pipeName);
- return pipeTaskInfo.get().isPipeStoppedByUser(pipeName);
+ return !pipeTaskInfo.get().isPipeStoppedByUser(pipeName);
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
index 7b0ca8a3d25..4566738cd8d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/AbstractOperateSubscriptionProcedure.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -53,6 +54,9 @@ public abstract class AbstractOperateSubscriptionProcedure
private static final Logger LOGGER =
LoggerFactory.getLogger(AbstractOperateSubscriptionProcedure.class);
+ private static final String SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE =
+ "Skip subscription-related operations and do nothing";
+
private static final int RETRY_THRESHOLD = 1;
protected AtomicReference<SubscriptionInfo> subscriptionInfo;
@@ -156,7 +160,7 @@ public abstract class AbstractOperateSubscriptionProcedure
protected abstract SubscriptionOperation getOperation();
- protected abstract void executeFromValidate(ConfigNodeProcedureEnv env)
+ protected abstract boolean executeFromValidate(ConfigNodeProcedureEnv env)
throws SubscriptionException;
protected abstract void
executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env)
@@ -179,7 +183,14 @@ public abstract class AbstractOperateSubscriptionProcedure
try {
switch (state) {
case VALIDATE:
- executeFromValidate(env);
+ if (!executeFromValidate(env)) {
+ LOGGER.info("ProcedureId {}: {}", getProcId(),
SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE);
+ // On client side, the message returned after the successful
execution of the
+ // subscription command corresponding to this procedure is "Msg:
The statement is
+ // executed successfully."
+
this.setResult(SKIP_SUBSCRIPTION_PROCEDURE_MESSAGE.getBytes(StandardCharsets.UTF_8));
+ return Flow.NO_MORE_STATE;
+ }
setNextState(OperateSubscriptionState.OPERATE_ON_CONFIG_NODES);
break;
case OPERATE_ON_CONFIG_NODES:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
index 981e0513d04..6e6031b8102 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/AlterConsumerGroupProcedure.java
@@ -75,10 +75,11 @@ public class AlterConsumerGroupProcedure extends
AbstractOperateSubscriptionProc
}
@Override
- public void executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
+ public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
LOGGER.info("AlterConsumerGroupProcedure: executeFromValidate, try to
validate");
validateAndGetOldAndNewMeta(env);
+ return true;
}
protected void validateAndGetOldAndNewMeta(ConfigNodeProcedureEnv env) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
index 6e3a5387479..bb010eaca40 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/consumer/runtime/ConsumerGroupMetaSyncProcedure.java
@@ -91,10 +91,11 @@ public class ConsumerGroupMetaSyncProcedure extends
AbstractOperateSubscriptionP
}
@Override
- public void executeFromValidate(ConfigNodeProcedureEnv env) {
+ public boolean executeFromValidate(ConfigNodeProcedureEnv env) {
LOGGER.info("ConsumerGroupMetaSyncProcedure: executeFromValidate");
LAST_EXECUTION_TIME.set(System.currentTimeMillis());
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
index 0894feb62ad..166f7b3da5e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/CreateSubscriptionProcedure.java
@@ -86,7 +86,7 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
}
@Override
- protected void executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
+ protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
LOGGER.info("CreateSubscriptionProcedure: executeFromValidate");
subscriptionInfo.get().validateBeforeSubscribe(subscribeReq);
@@ -131,6 +131,7 @@ public class CreateSubscriptionProcedure extends
AbstractOperateSubscriptionAndP
createPipeProcedure.executeFromValidateTask(env);
createPipeProcedure.executeFromCalculateInfoForTask(env);
}
+ return true;
}
// TODO: check periodically if the subscription is still valid but no
working pipe?
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
index 89da284ac0d..7d0c1e09cbd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/subscription/DropSubscriptionProcedure.java
@@ -87,7 +87,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
}
@Override
- protected void executeFromValidate(final ConfigNodeProcedureEnv env)
+ protected boolean executeFromValidate(final ConfigNodeProcedureEnv env)
throws SubscriptionException {
LOGGER.info("DropSubscriptionProcedure: executeFromValidate");
@@ -133,6 +133,7 @@ public class DropSubscriptionProcedure extends
AbstractOperateSubscriptionAndPip
// Validate AlterConsumerGroupProcedure
alterConsumerGroupProcedure.executeFromValidate(env);
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
index 93380ed10be..f8fbdab72e0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/AlterTopicProcedure.java
@@ -84,12 +84,14 @@ public class AlterTopicProcedure extends
AbstractOperateSubscriptionProcedure {
}
@Override
- public void executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
+ public boolean executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
LOGGER.info("AlterTopicProcedure: executeFromValidate");
subscriptionInfo.get().validateBeforeAlteringTopic(updatedTopicMeta);
existedTopicMeta =
subscriptionInfo.get().getTopicMeta(updatedTopicMeta.getTopicName());
+
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index ed3d59bd3d4..b712861cb54 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -66,11 +66,13 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
}
@Override
- protected void executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
+ protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
LOGGER.info("CreateTopicProcedure: executeFromValidate");
// 1. check if the topic exists
- subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq);
+ if (!subscriptionInfo.get().validateBeforeCreatingTopic(createTopicReq)) {
+ return false;
+ }
// 2. create the topic meta
topicMeta =
@@ -78,6 +80,7 @@ public class CreateTopicProcedure extends
AbstractOperateSubscriptionProcedure {
createTopicReq.getTopicName(),
System.currentTimeMillis(),
createTopicReq.getTopicAttributes());
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
index 54c6e3ed469..f1cfbb59d10 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/DropTopicProcedure.java
@@ -61,10 +61,11 @@ public class DropTopicProcedure extends
AbstractOperateSubscriptionProcedure {
}
@Override
- protected void executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
+ protected boolean executeFromValidate(ConfigNodeProcedureEnv env) throws
SubscriptionException {
LOGGER.info("DropTopicProcedure: executeFromValidate({})", topicName);
subscriptionInfo.get().validateBeforeDroppingTopic(topicName);
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
index 3d49a766102..40920e43936 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/runtime/TopicMetaSyncProcedure.java
@@ -90,10 +90,11 @@ public class TopicMetaSyncProcedure extends
AbstractOperateSubscriptionProcedure
}
@Override
- public void executeFromValidate(ConfigNodeProcedureEnv env) {
+ public boolean executeFromValidate(ConfigNodeProcedureEnv env) {
LOGGER.info("TopicMetaSyncProcedure: executeFromValidate");
LAST_EXECUTION_TIME.set(System.currentTimeMillis());
+ return true;
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 51b5f5c89f8..f1a09ad088a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -115,6 +115,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -808,7 +810,7 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus dropPipePlugin(TDropPipePluginReq req) {
- return configManager.dropPipePlugin(req.getPluginName());
+ return configManager.dropPipePlugin(req);
}
@Override
@@ -1038,7 +1040,13 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus dropPipe(String pipeName) {
- return configManager.dropPipe(pipeName);
+ return configManager.dropPipe(
+ new TDropPipeReq().setPipeName(pipeName).setIfExistsCondition(false));
+ }
+
+ @Override
+ public TSStatus dropPipeExtended(TDropPipeReq req) {
+ return configManager.dropPipe(req);
}
@Override
@@ -1068,7 +1076,13 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
@Override
public TSStatus dropTopic(String topicName) {
- return configManager.dropTopic(topicName);
+ return configManager.dropTopic(
+ new
TDropTopicReq().setTopicName(topicName).setIfExistsCondition(false));
+ }
+
+ @Override
+ public TSStatus dropTopicExtended(TDropTopicReq req) throws TException {
+ return configManager.dropTopic(req);
}
@Override
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
index a697adcdec8..f3571bed57e 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/PipeInfoTest.java
@@ -152,7 +152,7 @@ public class PipeInfoTest {
pipeInfo.getPipePluginInfo().createPipePlugin(createPipePluginPlan);
// Drop pipe plugin test plugin
- pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName);
+ pipeInfo.getPipePluginInfo().validateBeforeDroppingPipePlugin(pluginName,
false);
DropPipePluginPlan dropPipePluginPlan = new DropPipePluginPlan(pluginName);
pipeInfo.getPipePluginInfo().dropPipePlugin(dropPipePluginPlan);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
index 8405f2d4675..93b95a3f336 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/CreatePipePluginProcedureTest.java
@@ -41,7 +41,7 @@ public class CreatePipePluginProcedureTest {
PipePluginMeta pipePluginMeta =
new PipePluginMeta("test", "test.class", false, "test.jar",
"testMD5test");
CreatePipePluginProcedure proc =
- new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3});
+ new CreatePipePluginProcedure(pipePluginMeta, new byte[] {1, 2, 3},
false);
try {
proc.serialize(outputStream);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
index c29f5bd7fb5..232a5fbee59 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/plugin/DropPipePluginProcedureTest.java
@@ -36,7 +36,7 @@ public class DropPipePluginProcedureTest {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
- DropPipePluginProcedure proc = new DropPipePluginProcedure("test");
+ DropPipePluginProcedure proc = new DropPipePluginProcedure("test", false);
try {
proc.serialize(outputStream);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index ede990bf15c..c897d3000c3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -83,6 +83,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllPipeInfoResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetAllSubscriptionInfoResp;
@@ -973,6 +975,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.dropPipe(pipeName), status ->
!updateConfigNodeLeader(status));
}
+ @Override
+ public TSStatus dropPipeExtended(TDropPipeReq req) throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.dropPipeExtended(req), status ->
!updateConfigNodeLeader(status));
+ }
+
@Override
public TShowPipeResp showPipe(TShowPipeReq req) throws TException {
return executeRemoteCallWithRetry(
@@ -997,6 +1005,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.dropTopic(topicName), status ->
!updateConfigNodeLeader(status));
}
+ @Override
+ public TSStatus dropTopicExtended(TDropTopicReq req) throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.dropTopicExtended(req), status ->
!updateConfigNodeLeader(status));
+ }
+
@Override
public TShowTopicResp showTopic(TShowTopicReq req) throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 8587fc606dd..8b6d53582eb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -75,6 +75,8 @@ import
org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipePluginReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropPipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropTopicReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetPipePluginTableResp;
@@ -181,6 +183,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
@@ -856,6 +859,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
client.createPipePlugin(
new TCreatePipePluginReq()
.setPluginName(pluginName)
+
.setIfNotExistsCondition(createPipePluginStatement.hasIfNotExistsCondition())
.setClassName(className)
.setJarFile(jarFile)
.setJarMD5(jarMd5)
@@ -882,13 +886,21 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
}
@Override
- public SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName) {
+ public SettableFuture<ConfigTaskResult> dropPipePlugin(
+ DropPipePluginStatement dropPipePluginStatement) {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (final ConfigNodeClient client =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TSStatus executionStatus = client.dropPipePlugin(new
TDropPipePluginReq(pluginName));
+ final TSStatus executionStatus =
+ client.dropPipePlugin(
+ new TDropPipePluginReq()
+ .setPluginName(dropPipePluginStatement.getPluginName())
+
.setIfExistsCondition(dropPipePluginStatement.hasIfExistsCondition()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
executionStatus.getCode()) {
- LOGGER.warn("[{}] Failed to drop pipe plugin {}.", executionStatus,
pluginName);
+ LOGGER.warn(
+ "[{}] Failed to drop pipe plugin {}.",
+ executionStatus,
+ dropPipePluginStatement.getPluginName());
future.setException(new IoTDBException(executionStatus.message,
executionStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
@@ -1752,6 +1764,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
TCreatePipeReq req =
new TCreatePipeReq()
.setPipeName(createPipeStatement.getPipeName())
+
.setIfNotExistsCondition(createPipeStatement.hasIfNotExistsCondition())
.setExtractorAttributes(createPipeStatement.getExtractorAttributes())
.setProcessorAttributes(createPipeStatement.getProcessorAttributes())
.setConnectorAttributes(createPipeStatement.getConnectorAttributes());
@@ -1821,6 +1834,7 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
alterPipeStatement.isReplaceAllConnectorAttributes());
req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes());
req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes());
+ req.setIfExistsCondition(alterPipeStatement.hasIfExistsCondition());
final TSStatus tsStatus = configNodeClient.alterPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn("Failed to alter pipe {} in config node, status is {}.",
pipeName, tsStatus);
@@ -1884,7 +1898,11 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TSStatus tsStatus =
configNodeClient.dropPipe(dropPipeStatement.getPipeName());
+ final TSStatus tsStatus =
+ configNodeClient.dropPipeExtended(
+ new TDropPipeReq()
+ .setPipeName(dropPipeStatement.getPipeName())
+
.setIfExistsCondition(dropPipeStatement.hasIfExistsCondition()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to drop pipe {}, status is {}.",
dropPipeStatement.getPipeName(), tsStatus);
@@ -2032,7 +2050,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try (final ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
final TCreateTopicReq req =
- new
TCreateTopicReq().setTopicName(topicName).setTopicAttributes(topicAttributes);
+ new TCreateTopicReq()
+ .setTopicName(topicName)
+
.setIfNotExistsCondition(createTopicStatement.hasIfNotExistsCondition())
+ .setTopicAttributes(topicAttributes);
final TSStatus tsStatus = configNodeClient.createTopic(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn("Failed to create topic {} in config node, status is {}.",
topicName, tsStatus);
@@ -2051,7 +2072,11 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
final SettableFuture<ConfigTaskResult> future = SettableFuture.create();
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
- final TSStatus tsStatus =
configNodeClient.dropTopic(dropTopicStatement.getTopicName());
+ final TSStatus tsStatus =
+ configNodeClient.dropTopicExtended(
+ new TDropTopicReq()
+
.setIfExistsCondition(dropTopicStatement.hasIfExistsCondition())
+ .setTopicName(dropTopicStatement.getTopicName()));
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn(
"Failed to drop topic {}, status is {}.",
dropTopicStatement.getTopicName(), tsStatus);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 05ad88bfc5e..fad6ec99cba 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -49,6 +49,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTTLStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.AlterPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipePluginStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.CreatePipeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipePluginStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.DropPipeStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.ShowPipesStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.metadata.pipe.StartPipeStatement;
@@ -105,7 +106,7 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> createPipePlugin(CreatePipePluginStatement
createPipeStatement);
- SettableFuture<ConfigTaskResult> dropPipePlugin(String pluginName);
+ SettableFuture<ConfigTaskResult> dropPipePlugin(DropPipePluginStatement
dropPipePluginStatement);
SettableFuture<ConfigTaskResult> showPipePlugins();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
index 82f9a325706..eb57f958430 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/DropPipePluginTask.java
@@ -28,15 +28,15 @@ import com.google.common.util.concurrent.ListenableFuture;
public class DropPipePluginTask implements IConfigTask {
- private final String pluginName;
+ private final DropPipePluginStatement dropPipePluginStatement;
public DropPipePluginTask(DropPipePluginStatement dropPipePluginStatement) {
- this.pluginName = dropPipePluginStatement.getPluginName();
+ this.dropPipePluginStatement = dropPipePluginStatement;
}
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
throws InterruptedException {
- return configTaskExecutor.dropPipePlugin(pluginName);
+ return configTaskExecutor.dropPipePlugin(dropPipePluginStatement);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index dfc2ee7964b..7235ffbc518 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -958,6 +958,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
public Statement
visitCreatePipePlugin(IoTDBSqlParser.CreatePipePluginContext ctx) {
return new CreatePipePluginStatement(
parseIdentifier(ctx.pluginName.getText()),
+ ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null,
parseStringLiteral(ctx.className.getText()),
parseAndValidateURI(ctx.uriClause()));
}
@@ -965,7 +966,10 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
// Drop PipePlugin
=====================================================================
@Override
public Statement visitDropPipePlugin(IoTDBSqlParser.DropPipePluginContext
ctx) {
- return new
DropPipePluginStatement(parseIdentifier(ctx.pluginName.getText()));
+ final DropPipePluginStatement dropPipePluginStatement = new
DropPipePluginStatement();
+
dropPipePluginStatement.setPluginName(parseIdentifier(ctx.pluginName.getText()));
+ dropPipePluginStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() !=
null);
+ return dropPipePluginStatement;
}
// Show PipePlugins
=====================================================================
@@ -3701,6 +3705,10 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
throw new SemanticException(
"Not support for this sql in CREATE PIPE, please enter pipe name.");
}
+
+ createPipeStatement.setIfNotExists(
+ ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);
+
if (ctx.extractorAttributesClause() != null) {
createPipeStatement.setExtractorAttributes(
parseExtractorAttributesClause(
@@ -3731,6 +3739,8 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
"Not support for this sql in ALTER PIPE, please enter pipe name.");
}
+ alterPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null);
+
if (ctx.alterExtractorAttributesClause() != null) {
alterPipeStatement.setExtractorAttributes(
parseExtractorAttributesClause(
@@ -3763,6 +3773,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
alterPipeStatement.setConnectorAttributes(new HashMap<>());
alterPipeStatement.setReplaceAllConnectorAttributes(false);
}
+
return alterPipeStatement;
}
@@ -3809,6 +3820,8 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
throw new SemanticException("Not support for this sql in DROP PIPE,
please enter pipename.");
}
+ dropPipeStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null);
+
return dropPipeStatement;
}
@@ -3861,6 +3874,9 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
"Not support for this sql in CREATE TOPIC, please enter topicName.");
}
+ createTopicStatement.setIfNotExists(
+ ctx.IF() != null && ctx.NOT() != null && ctx.EXISTS() != null);
+
if (ctx.topicAttributesClause() != null) {
createTopicStatement.setTopicAttributes(
parseTopicAttributesClause(ctx.topicAttributesClause().topicAttributeClause()));
@@ -3893,6 +3909,8 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
"Not support for this sql in DROP TOPIC, please enter topicName.");
}
+ dropTopicStatement.setIfExists(ctx.IF() != null && ctx.EXISTS() != null);
+
return dropTopicStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 0c0bef6c274..de5dc3c1d59 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -37,6 +37,7 @@ import java.util.Map;
public class AlterPipeStatement extends Statement implements IConfigStatement {
private String pipeName;
+ private boolean ifExistsCondition;
private Map<String, String> extractorAttributes;
private Map<String, String> processorAttributes;
private Map<String, String> connectorAttributes;
@@ -52,6 +53,10 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
return pipeName;
}
+ public boolean hasIfExistsCondition() {
+ return ifExistsCondition;
+ }
+
public Map<String, String> getExtractorAttributes() {
return extractorAttributes;
}
@@ -80,6 +85,10 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
this.pipeName = pipeName;
}
+ public void setIfExists(boolean ifExistsCondition) {
+ this.ifExistsCondition = ifExistsCondition;
+ }
+
public void setExtractorAttributes(Map<String, String> extractorAttributes) {
this.extractorAttributes = extractorAttributes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
index f954cc347dc..f0975b56012 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipePluginStatement.java
@@ -36,13 +36,16 @@ import java.util.List;
public class CreatePipePluginStatement extends Statement implements
IConfigStatement {
private final String pluginName;
+ private final boolean ifNotExistsCondition;
private final String className;
private final String uriString;
- public CreatePipePluginStatement(String pluginName, String className, String
uriString) {
+ public CreatePipePluginStatement(
+ String pluginName, boolean ifNotExistsCondition, String className,
String uriString) {
super();
statementType = StatementType.CREATE_PIPEPLUGIN;
this.pluginName = pluginName;
+ this.ifNotExistsCondition = ifNotExistsCondition;
this.className = className;
this.uriString = uriString;
}
@@ -51,6 +54,10 @@ public class CreatePipePluginStatement extends Statement
implements IConfigState
return pluginName;
}
+ public boolean hasIfNotExistsCondition() {
+ return ifNotExistsCondition;
+ }
+
public String getClassName() {
return className;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
index 45a1a4664cb..a7b7471ffd0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/CreatePipeStatement.java
@@ -37,6 +37,7 @@ import java.util.Map;
public class CreatePipeStatement extends Statement implements IConfigStatement
{
private String pipeName;
+ private boolean ifNotExistsCondition;
private Map<String, String> extractorAttributes;
private Map<String, String> processorAttributes;
private Map<String, String> connectorAttributes;
@@ -49,6 +50,10 @@ public class CreatePipeStatement extends Statement
implements IConfigStatement {
return pipeName;
}
+ public boolean hasIfNotExistsCondition() {
+ return ifNotExistsCondition;
+ }
+
public Map<String, String> getExtractorAttributes() {
return extractorAttributes;
}
@@ -65,6 +70,10 @@ public class CreatePipeStatement extends Statement
implements IConfigStatement {
this.pipeName = pipeName;
}
+ public void setIfNotExists(boolean ifNotExistsCondition) {
+ this.ifNotExistsCondition = ifNotExistsCondition;
+ }
+
public void setExtractorAttributes(Map<String, String> extractorAttributes) {
this.extractorAttributes = extractorAttributes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
index 49807d21a24..c7f4ebfd5df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipePluginStatement.java
@@ -35,18 +35,30 @@ import java.util.List;
public class DropPipePluginStatement extends Statement implements
IConfigStatement {
- private final String pluginName;
+ private String pluginName;
+ private boolean ifExistsCondition;
- public DropPipePluginStatement(String pluginName) {
+ public DropPipePluginStatement() {
super();
statementType = StatementType.DROP_PIPEPLUGIN;
- this.pluginName = pluginName;
}
public String getPluginName() {
return pluginName;
}
+ public boolean hasIfExistsCondition() {
+ return ifExistsCondition;
+ }
+
+ public void setPluginName(String pluginName) {
+ this.pluginName = pluginName;
+ }
+
+ public void setIfExists(boolean ifExistsCondition) {
+ this.ifExistsCondition = ifExistsCondition;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
index a7d54a3086f..a3403e00e68 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/DropPipeStatement.java
@@ -36,11 +36,16 @@ import java.util.List;
public class DropPipeStatement extends Statement implements IConfigStatement {
private String pipeName;
+ private boolean ifExistsCondition;
public DropPipeStatement(StatementType dropPipeStatement) {
this.statementType = dropPipeStatement;
}
+ public boolean hasIfExistsCondition() {
+ return ifExistsCondition;
+ }
+
public String getPipeName() {
return pipeName;
}
@@ -49,6 +54,10 @@ public class DropPipeStatement extends Statement implements
IConfigStatement {
this.pipeName = pipeName;
}
+ public void setIfExists(boolean ifExistsCondition) {
+ this.ifExistsCondition = ifExistsCondition;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
index 6482975b038..a98a1d5d273 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/CreateTopicStatement.java
@@ -37,7 +37,7 @@ import java.util.Map;
public class CreateTopicStatement extends Statement implements
IConfigStatement {
private String topicName;
-
+ private boolean ifNotExistsCondition;
private Map<String, String> topicAttributes;
public CreateTopicStatement() {
@@ -49,6 +49,10 @@ public class CreateTopicStatement extends Statement
implements IConfigStatement
return topicName;
}
+ public boolean hasIfNotExistsCondition() {
+ return ifNotExistsCondition;
+ }
+
public Map<String, String> getTopicAttributes() {
return topicAttributes;
}
@@ -57,6 +61,10 @@ public class CreateTopicStatement extends Statement
implements IConfigStatement
this.topicName = topicName;
}
+ public void setIfNotExists(boolean ifNotExistsCondition) {
+ this.ifNotExistsCondition = ifNotExistsCondition;
+ }
+
public void setTopicAttributes(Map<String, String> topicAttributes) {
this.topicAttributes = topicAttributes;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
index e366b1c5eb3..36525b1846e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/subscription/DropTopicStatement.java
@@ -35,6 +35,7 @@ import java.util.List;
public class DropTopicStatement extends Statement implements IConfigStatement {
private String topicName;
+ private boolean ifExistsCondition;
public DropTopicStatement() {
super();
@@ -45,10 +46,18 @@ public class DropTopicStatement extends Statement
implements IConfigStatement {
return topicName;
}
+ public boolean hasIfExistsCondition() {
+ return ifExistsCondition;
+ }
+
public void setTopicName(String topicName) {
this.topicName = topicName;
}
+ public void setIfExists(boolean ifExistsCondition) {
+ this.ifExistsCondition = ifExistsCondition;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 4dcce1c49ff..7044bd0b221 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -673,10 +673,12 @@ struct TCreatePipePluginReq {
3: required string jarName
4: required binary jarFile
5: required string jarMD5
+ 6: optional bool ifNotExistsCondition
}
struct TDropPipePluginReq {
1: required string pluginName
+ 2: optional bool ifExistsCondition
}
// Get PipePlugin table from config node
@@ -709,6 +711,7 @@ struct TCreatePipeReq {
2: optional map<string, string> extractorAttributes
3: optional map<string, string> processorAttributes
4: required map<string, string> connectorAttributes
+ 5: optional bool ifNotExistsCondition
}
struct TAlterPipeReq {
@@ -719,6 +722,12 @@ struct TAlterPipeReq {
5: required bool isReplaceAllConnectorAttributes
6: optional map<string, string> extractorAttributes
7: optional bool isReplaceAllExtractorAttributes
+ 8: optional bool ifExistsCondition
+}
+
+struct TDropPipeReq {
+ 1: required string pipeName
+ 2: optional bool ifExistsCondition
}
// Deprecated, restored for compatibility
@@ -773,6 +782,12 @@ struct TAlterLogicalViewReq {
struct TCreateTopicReq {
1: required string topicName
2: optional map<string, string> topicAttributes
+ 3: optional bool ifNotExistsCondition
+}
+
+struct TDropTopicReq {
+ 1: required string topicName
+ 2: optional bool ifExistsCondition
}
struct TShowTopicReq {
@@ -1484,6 +1499,9 @@ service IConfigNodeRPCService {
/** Drop Pipe */
common.TSStatus dropPipe(string pipeName)
+ /** Drop Pipe */
+ common.TSStatus dropPipeExtended(TDropPipeReq req)
+
/** Show Pipe by name, if name is empty, show all Pipe */
TShowPipeResp showPipe(TShowPipeReq req)
@@ -1505,6 +1523,9 @@ service IConfigNodeRPCService {
/** Drop Topic */
common.TSStatus dropTopic(string topicName)
+ /** Drop Topic */
+ common.TSStatus dropTopicExtended(TDropTopicReq req)
+
/** Show Topic by name, if name is empty, show all Topic */
TShowTopicResp showTopic(TShowTopicReq req)