This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fd4337eb0af Pipe: support replace and modify mode for alter pipe sql
(#12018)
fd4337eb0af is described below
commit fd4337eb0af18000babaaf3f16a6643b520fd3b8
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Feb 6 10:47:59 2024 +0800
Pipe: support replace and modify mode for alter pipe sql (#12018)
---
.../org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java | 243 +++++++++++++++++++--
.../api/customizer/parameter/PipeParameters.java | 39 ++--
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 12 +-
.../confignode/persistence/pipe/PipeTaskInfo.java | 73 ++++---
.../impl/pipe/task/AlterPipeProcedureV2.java | 16 +-
.../impl/pipe/task/AlterPipeProcedureV2Test.java | 5 +-
.../config/executor/ClusterConfigTaskExecutor.java | 28 +--
.../db/queryengine/plan/parser/ASTVisitor.java | 15 +-
.../metadata/pipe/AlterPipeStatement.java | 18 ++
.../src/main/thrift/confignode.thrift | 6 +-
10 files changed, 352 insertions(+), 103 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java
index a5534b32411..dfed53ab9c1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.pipe.it;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2;
@@ -34,7 +35,10 @@ import org.junit.runner.RunWith;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import static org.junit.Assert.fail;
@@ -43,13 +47,13 @@ import static org.junit.Assert.fail;
public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
@Test
- public void testBasicPipeAlter() throws Exception {
+ public void testBasicAlterPipe() throws Exception {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
// create pipe
String sql =
String.format(
- "create pipe a2b with sink ('node-urls'='%s',
'batch.enable'='false')",
+ "create pipe a2b with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString());
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
@@ -58,15 +62,24 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
fail(e.getMessage());
}
- // show pipe and record first creation time
- long firstCreationTime;
+ // show pipe
+ long lastCreationTime;
try (SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
-
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
+ // check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
- firstCreationTime = showPipeResult.get(0).creationTime;
+ // check configurations
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // record last creation time
+ lastCreationTime = showPipeResult.get(0).creationTime;
}
// stop pipe
@@ -82,17 +95,14 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
+ // check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
}
- // alter pipe
- sql =
- String.format(
- "alter pipe a2b modify sink ('node-urls'='%s',
'batch.enable'='true')",
- receiverDataNode.getIpAndPortString());
+ // alter pipe (modify)
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
- statement.execute(sql);
+ statement.execute("alter pipe a2b modify sink
('sink.batch.enable'='false')");
} catch (SQLException e) {
fail(e.getMessage());
}
@@ -102,17 +112,159 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT
{
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
Assert.assertEquals(1, showPipeResult.size());
-
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+ // check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
- // alter pipe will reset pipe creation time
- Assert.assertTrue(showPipeResult.get(0).creationTime >
firstCreationTime);
- // alter pipe will clear exception messages
+ // check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
+ // start pipe
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("start pipe a2b");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // alter pipe (replace)
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b replace processor
('processor'='down-sampling-processor')");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // check configurations
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
+ // alter pipe (modify)
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify sink
('connector.batch.enable'='true')");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
+ // alter pipe (replace empty)
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b replace processor ()");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+ Assert.assertFalse(
+
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
+ // alter pipe (modify empty)
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify sink ()");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+ Assert.assertFalse(
+
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // check exception message
Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
}
}
@Test
- public void testPipeAlterFailure() {
+ public void testAlterPipeFailure() {
DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
// alter non-existed pipe
@@ -138,17 +290,66 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
} catch (SQLException e) {
fail(e.getMessage());
}
+ }
- // useless alter
- sql =
+ @Test
+ public void testAlterPipeProcessor() {
+ DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // create pipe
+ String sql =
String.format(
- "alter pipe a2b modify sink ('node-urls'='%s',
'batch.enable'='false')",
+ "create pipe a2b with processor
('processor'='down-sampling-processor', 'down-sampling.interval-seconds'='1',
'down-sampling.split-file'='true') with sink ('node-urls'='%s',
'batch.enable'='false')",
receiverDataNode.getIpAndPortString());
try (Connection connection = senderEnv.getConnection();
Statement statement = connection.createStatement()) {
statement.execute(sql);
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // insert data on sender
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2),
(2000, 3), (2500, 4), (3000, 5)",
+ "flush"))) {
+ fail();
+ }
+
+ // check data on receiver
+ Set<String> expectedResSet = new HashSet<>();
+ expectedResSet.add("1000,1.0,");
+ expectedResSet.add("2000,3.0,");
+ expectedResSet.add("3000,5.0,");
+ TestUtils.assertDataOnEnv(
+ receiverEnv, "select * from root.**", "Time,root.db.d1.at1,",
expectedResSet);
+
+ // alter pipe (modify 'down-sampling.interval-seconds')
+ try (Connection connection = senderEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify processor
('down-sampling.interval-seconds'='2')");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // insert data on sender
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (11000, 1), (11500, 2),
(12000, 3), (12500, 4), (13000, 5)",
+ "flush"))) {
fail();
- } catch (SQLException ignore) {
}
+
+ // check data on receiver
+ expectedResSet.clear();
+ expectedResSet.add("11000,1.0,");
+ expectedResSet.add("13000,5.0,");
+ TestUtils.assertDataOnEnv(
+ receiverEnv,
+ "select * from root.** where time > 10000",
+ "Time,root.db.d1.at1,",
+ expectedResSet);
}
}
diff --git
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index aee06624f7f..3bf0f0bb688 100644
---
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
-import java.util.TreeMap;
import java.util.stream.Collectors;
/**
@@ -281,30 +280,25 @@ public class PipeParameters {
}
/**
- * This method uses {@link KeyReducer} to reduce keys and then sorts them to
determine if two
- * PipeParameters are equivalent.
+ * This method adds (non-existed) or replaces (existed) equivalent
attributes in this
+ * PipeParameters with those from another PipeParameters.
+ *
+ * @param that provide the key that needs to be updated along with the value
+ * @return this pipe parameters
*/
- public boolean isEquivalent(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- PipeParameters that = (PipeParameters) obj;
- return Objects.equals(
+ public PipeParameters addOrReplaceEquivalentAttributes(PipeParameters that) {
+ Map<String, Entry<String, String>> thisMap =
this.attributes.entrySet().stream()
- .collect(
- Collectors.toMap(
- entry -> KeyReducer.reduce(entry.getKey()),
Entry::getValue,
- (oldValue, newValue) -> oldValue, TreeMap::new))
- .toString(),
+ .collect(Collectors.toMap(entry ->
KeyReducer.reduce(entry.getKey()), entry -> entry));
+ Map<String, Entry<String, String>> thatMap =
that.attributes.entrySet().stream()
- .collect(
- Collectors.toMap(
- entry -> KeyReducer.reduce(entry.getKey()),
Entry::getValue,
- (oldValue, newValue) -> oldValue, TreeMap::new))
- .toString());
+ .collect(Collectors.toMap(entry ->
KeyReducer.reduce(entry.getKey()), entry -> entry));
+ thatMap.forEach(
+ (key, entry) -> {
+ this.attributes.remove(thisMap.getOrDefault(key, entry).getKey());
+ this.attributes.put(entry.getKey(), entry.getValue());
+ });
+ return this;
}
private static class KeyReducer {
@@ -334,6 +328,7 @@ public class PipeParameters {
}
public static class ValueHider {
+
private static final Set<String> KEYS = new HashSet<>();
private static final String PLACEHOLDER = "******";
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index af9cbb8ee84..d64bbd4d1d6 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -563,19 +563,19 @@ connectorAttributeClause
alterPipe
: ALTER PIPE pipeName=identifier
- modifyProcessorAttributesClause?
- modifyConnectorAttributesClause?
+ alterProcessorAttributesClause?
+ alterConnectorAttributesClause?
;
-modifyProcessorAttributesClause
- : MODIFY PROCESSOR
+alterProcessorAttributesClause
+ : (MODIFY | REPLACE) PROCESSOR
LR_BRACKET
(processorAttributeClause COMMA)* processorAttributeClause?
RR_BRACKET
;
-modifyConnectorAttributesClause
- : MODIFY (CONNECTOR | SINK)
+alterConnectorAttributesClause
+ : (MODIFY | REPLACE) (CONNECTOR | SINK)
LR_BRACKET
(connectorAttributeClause COMMA)* connectorAttributeClause?
RR_BRACKET
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 196852406c5..10085a49357 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -54,6 +54,7 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -162,16 +163,18 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- public void checkBeforeAlterPipe(TAlterPipeReq alterPipeRequest) throws
PipeException {
+ public void checkAndUpdateRequestBeforeAlterPipe(TAlterPipeReq
alterPipeRequest)
+ throws PipeException {
acquireReadLock();
try {
- checkBeforeAlterPipeInternal(alterPipeRequest);
+ checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
} finally {
releaseReadLock();
}
}
- private void checkBeforeAlterPipeInternal(TAlterPipeReq alterPipeRequest)
throws PipeException {
+ private void checkAndUpdateRequestBeforeAlterPipeInternal(TAlterPipeReq
alterPipeRequest)
+ throws PipeException {
if (!isPipeExisted(alterPipeRequest.getPipeName())) {
final String exceptionMessage =
String.format(
@@ -180,30 +183,46 @@ public class PipeTaskInfo implements SnapshotProcessor {
throw new PipeException(exceptionMessage);
}
- PipeMeta pipeMetaFromCoordinator =
getPipeMetaByPipeName(alterPipeRequest.getPipeName());
- PipeStaticMeta pipeStaticMetaFromCoordinator =
pipeMetaFromCoordinator.getStaticMeta();
- // fill empty attributes and check useless alter from the perspective of CN
- boolean needToAlter = false;
- if (alterPipeRequest.getProcessorAttributes().isEmpty()) {
- alterPipeRequest.setProcessorAttributes(
-
pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute());
- } else if (!(new PipeParameters(alterPipeRequest.getProcessorAttributes()))
- .isEquivalent(pipeStaticMetaFromCoordinator.getProcessorParameters()))
{
- needToAlter = true;
- }
- if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
- alterPipeRequest.setConnectorAttributes(
-
pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
- } else if (!(new PipeParameters(alterPipeRequest.getConnectorAttributes())
-
.isEquivalent(pipeStaticMetaFromCoordinator.getConnectorParameters()))) {
- needToAlter = true;
- }
- if (!needToAlter) {
- final String exceptionMessage =
- String.format(
- "Failed to alter pipe %s, nothing to alter",
alterPipeRequest.getPipeName());
- LOGGER.warn(exceptionMessage);
- throw new PipeException(exceptionMessage);
+ PipeStaticMeta pipeStaticMetaFromCoordinator =
+ getPipeMetaByPipeName(alterPipeRequest.getPipeName()).getStaticMeta();
+ // deep copy current pipe static meta
+ PipeStaticMeta copiedPipeStaticMetaFromCoordinator =
+ new PipeStaticMeta(
+ pipeStaticMetaFromCoordinator.getPipeName(),
+ pipeStaticMetaFromCoordinator.getCreationTime(),
+ new
HashMap<>(pipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute()),
+ new
HashMap<>(pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute()),
+ new
HashMap<>(pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute()));
+
+ // 1. In modify mode, based on the passed attributes:
+ // 1.1. if they are empty, the original attributes are filled directly.
+ // 1.2. Otherwise, corresponding updates on original attributes are
performed.
+ // 2. In replace mode, do nothing here.
+ if (!alterPipeRequest.isReplaceAllProcessorAttributes) { // modify mode
+ if (alterPipeRequest.getProcessorAttributes().isEmpty()) {
+ alterPipeRequest.setProcessorAttributes(
+
copiedPipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute());
+ } else {
+ alterPipeRequest.setProcessorAttributes(
+ copiedPipeStaticMetaFromCoordinator
+ .getProcessorParameters()
+ .addOrReplaceEquivalentAttributes(
+ new
PipeParameters(alterPipeRequest.getProcessorAttributes()))
+ .getAttribute());
+ }
+ }
+ if (!alterPipeRequest.isReplaceAllConnectorAttributes) { // modify mode
+ if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
+ alterPipeRequest.setConnectorAttributes(
+
copiedPipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
+ } else {
+ alterPipeRequest.setConnectorAttributes(
+ copiedPipeStaticMetaFromCoordinator
+ .getConnectorParameters()
+ .addOrReplaceEquivalentAttributes(
+ new
PipeParameters(alterPipeRequest.getConnectorAttributes()))
+ .getAttribute());
+ }
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 4b378a255fd..601943d0bd8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -80,15 +80,18 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
LOGGER.info(
"AlterPipeProcedureV2: executeFromValidateTask({})",
alterPipeRequest.getPipeName());
+ // We should execute checkBeforeAlterPipe before checking the pipe plugin.
This method will
+ // update the alterPipeRequest based on the alterPipeRequest and existing
pipe metadata.
+ pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
+
final PipeManager pipeManager = env.getConfigManager().getPipeManager();
pipeManager
.getPipePluginCoordinator()
.getPipePluginInfo()
.checkPipePluginExistence(
- new HashMap<>(),
+ new HashMap<>(), // no need to check pipe source plugin
alterPipeRequest.getProcessorAttributes(),
alterPipeRequest.getConnectorAttributes());
- pipeTaskInfo.get().checkBeforeAlterPipe(alterPipeRequest);
return false;
}
@@ -112,7 +115,10 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
new PipeStaticMeta(
alterPipeRequest.getPipeName(),
System.currentTimeMillis(),
- new
HashMap<>(currentPipeStaticMeta.getExtractorParameters().getAttribute()),
+ new HashMap<>(
+ currentPipeStaticMeta
+ .getExtractorParameters()
+ .getAttribute()), // reuse pipe source plugin
new HashMap<>(alterPipeRequest.getProcessorAttributes()),
new HashMap<>(alterPipeRequest.getConnectorAttributes()));
@@ -254,6 +260,8 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
ReadWriteIOUtils.write(entry.getKey(), stream);
ReadWriteIOUtils.write(entry.getValue(), stream);
}
+ ReadWriteIOUtils.write(alterPipeRequest.isReplaceAllProcessorAttributes,
stream);
+ ReadWriteIOUtils.write(alterPipeRequest.isReplaceAllConnectorAttributes,
stream);
if (currentPipeStaticMeta != null) {
ReadWriteIOUtils.write(true, stream);
currentPipeStaticMeta.serialize(stream);
@@ -300,6 +308,8 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.getConnectorAttributes()
.put(ReadWriteIOUtils.readString(byteBuffer),
ReadWriteIOUtils.readString(byteBuffer));
}
+ alterPipeRequest.isReplaceAllProcessorAttributes =
ReadWriteIOUtils.readBool(byteBuffer);
+ alterPipeRequest.isReplaceAllConnectorAttributes =
ReadWriteIOUtils.readBool(byteBuffer);
if (ReadWriteIOUtils.readBool(byteBuffer)) {
currentPipeStaticMeta = PipeStaticMeta.deserialize(byteBuffer);
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
index 326fcc791bc..f23d22a1638 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
@@ -48,10 +48,7 @@ public class AlterPipeProcedureV2Test {
AlterPipeProcedureV2 proc =
new AlterPipeProcedureV2(
- new TAlterPipeReq()
- .setPipeName("testPipe")
- .setProcessorAttributes(processorAttributes)
- .setConnectorAttributes(connectorAttributes));
+ new TAlterPipeReq("testPipe", processorAttributes,
connectorAttributes, false, true));
try {
proc.serialize(outputStream);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index c17b4c5e1bd..b4c09e362cb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1661,15 +1661,16 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
public SettableFuture<ConfigTaskResult> alterPipe(AlterPipeStatement
alterPipeStatement) {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- // Validate before alteration
+ // Validate before alteration - only validate replace mode
+ final String pipeName = alterPipeStatement.getPipeName();
try {
- if (!alterPipeStatement.getProcessorAttributes().isEmpty()) {
+ if (!alterPipeStatement.getProcessorAttributes().isEmpty()
+ && alterPipeStatement.isReplaceAllProcessorAttributes()) {
PipeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
}
- if (!alterPipeStatement.getConnectorAttributes().isEmpty()) {
- PipeAgent.plugin()
- .validateConnector(
- alterPipeStatement.getPipeName(),
alterPipeStatement.getConnectorAttributes());
+ if (!alterPipeStatement.getConnectorAttributes().isEmpty()
+ && alterPipeStatement.isReplaceAllConnectorAttributes()) {
+ PipeAgent.plugin().validateConnector(pipeName,
alterPipeStatement.getConnectorAttributes());
}
} catch (Exception e) {
LOGGER.info("Failed to validate pipe statement, because {}",
e.getMessage(), e);
@@ -1681,16 +1682,15 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
try (ConfigNodeClient configNodeClient =
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
TAlterPipeReq req =
- new TAlterPipeReq()
- .setPipeName(alterPipeStatement.getPipeName())
-
.setProcessorAttributes(alterPipeStatement.getProcessorAttributes())
-
.setConnectorAttributes(alterPipeStatement.getConnectorAttributes());
+ new TAlterPipeReq(
+ pipeName,
+ alterPipeStatement.getProcessorAttributes(),
+ alterPipeStatement.getConnectorAttributes(),
+ alterPipeStatement.isReplaceAllProcessorAttributes(),
+ alterPipeStatement.isReplaceAllConnectorAttributes());
TSStatus tsStatus = configNodeClient.alterPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
- LOGGER.warn(
- "Failed to alter pipe {} in config node, status is {}.",
- alterPipeStatement.getPipeName(),
- tsStatus);
+ LOGGER.warn("Failed to alter pipe {} in config node, status is {}.",
pipeName, tsStatus);
future.setException(new IoTDBException(tsStatus.message,
tsStatus.code));
} else {
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index fedc436cb32..fe96a9dcee2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -224,6 +224,7 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -3606,19 +3607,25 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
throw new SemanticException(
"Not support for this sql in ALTER PIPE, please enter pipe name.");
}
- if (ctx.modifyProcessorAttributesClause() != null) {
+ if (ctx.alterProcessorAttributesClause() != null) {
alterPipeStatement.setProcessorAttributes(
parseProcessorAttributesClause(
-
ctx.modifyProcessorAttributesClause().processorAttributeClause()));
+
ctx.alterProcessorAttributesClause().processorAttributeClause()));
+ alterPipeStatement.setReplaceAllProcessorAttributes(
+ Objects.nonNull(ctx.alterProcessorAttributesClause().REPLACE()));
} else {
alterPipeStatement.setProcessorAttributes(new HashMap<>());
+ alterPipeStatement.setReplaceAllProcessorAttributes(false);
}
- if (ctx.modifyConnectorAttributesClause() != null) {
+ if (ctx.alterConnectorAttributesClause() != null) {
alterPipeStatement.setConnectorAttributes(
parseConnectorAttributesClause(
-
ctx.modifyConnectorAttributesClause().connectorAttributeClause()));
+
ctx.alterConnectorAttributesClause().connectorAttributeClause()));
+ alterPipeStatement.setReplaceAllConnectorAttributes(
+ Objects.nonNull(ctx.alterConnectorAttributesClause().REPLACE()));
} else {
alterPipeStatement.setConnectorAttributes(new HashMap<>());
+ alterPipeStatement.setReplaceAllConnectorAttributes(false);
}
return alterPipeStatement;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 9fcdf2436dc..644d26ba0bb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -39,6 +39,8 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
private String pipeName;
private Map<String, String> processorAttributes;
private Map<String, String> connectorAttributes;
+ private boolean isReplaceAllProcessorAttributes;
+ private boolean isReplaceAllConnectorAttributes;
public AlterPipeStatement(StatementType alterPipeStatement) {
this.statementType = alterPipeStatement;
@@ -56,6 +58,14 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
return connectorAttributes;
}
+ public boolean isReplaceAllProcessorAttributes() {
+ return isReplaceAllProcessorAttributes;
+ }
+
+ public boolean isReplaceAllConnectorAttributes() {
+ return isReplaceAllConnectorAttributes;
+ }
+
public void setPipeName(String pipeName) {
this.pipeName = pipeName;
}
@@ -68,6 +78,14 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
this.connectorAttributes = connectorAttributes;
}
+ public void setReplaceAllProcessorAttributes(boolean
replaceAllProcessorAttributes) {
+ isReplaceAllProcessorAttributes = replaceAllProcessorAttributes;
+ }
+
+ public void setReplaceAllConnectorAttributes(boolean
replaceAllConnectorAttributes) {
+ isReplaceAllConnectorAttributes = replaceAllConnectorAttributes;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 52b4cccdc99..a04cfe588a5 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -701,8 +701,10 @@ struct TCreatePipeReq {
struct TAlterPipeReq {
1: required string pipeName
- 2: optional map<string, string> processorAttributes
- 3: optional map<string, string> connectorAttributes
+ 2: required map<string, string> processorAttributes
+ 3: required map<string, string> connectorAttributes
+ 4: required bool isReplaceAllProcessorAttributes
+ 5: required bool isReplaceAllConnectorAttributes
}
// Deprecated, restored for compatibility