This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 71a4ecb642f Pipe: support alter pipe source (#12932)
71a4ecb642f is described below
commit 71a4ecb642f3f292a33ef3d9a5a8f91bf01d47f0
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jul 17 19:31:50 2024 +0800
Pipe: support alter pipe source (#12932)
---
.../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 183 +++++++++++++++++++--
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 8 +
.../confignode/persistence/pipe/PipeTaskInfo.java | 15 ++
.../impl/pipe/task/AlterPipeProcedureV2.java | 52 +++++-
.../procedure/store/ProcedureFactory.java | 5 +-
.../confignode/procedure/store/ProcedureType.java | 1 +
.../impl/pipe/task/AlterPipeProcedureV2Test.java | 10 +-
...reV2Test.java => AlterPipeProcedureV3Test.java} | 19 ++-
.../config/executor/ClusterConfigTaskExecutor.java | 6 +
.../db/queryengine/plan/parser/ASTVisitor.java | 13 ++
.../metadata/pipe/AlterPipeStatement.java | 18 ++
.../src/main/thrift/confignode.thrift | 2 +
12 files changed, 306 insertions(+), 26 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index 2de8454eb51..fb541e7dcdb 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -53,7 +53,7 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualAutoIT {
// Create pipe
final String sql =
String.format(
- "create pipe a2b with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+ "create pipe a2b with source ('source'='iotdb-source',
'source.pattern'='root.test1', 'source.realtime.mode'='stream') with processor
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
receiverDataNode.getIpAndPortString());
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
@@ -71,6 +71,10 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualAutoIT
{
// Check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
// Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
Assert.assertTrue(
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
Assert.assertTrue(
@@ -99,6 +103,76 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
}
+ // Alter pipe (modify)
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify source
('source.pattern'='root.test2')");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // Show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // Check status
+ Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
+ // Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test2"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // Check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // Check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
+ // Alter pipe (replace)
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ "alter pipe a2b replace source ('source'='iotdb-source',
'source.path'='root.test1.**')");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // Show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // check status
+ Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
+ // check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test2"));
+ Assert.assertFalse(
+
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+ Assert.assertTrue(
+
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // Check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
// Alter pipe (modify)
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
@@ -115,6 +189,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// Check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
// Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
Assert.assertTrue(
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
@@ -155,6 +231,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
// check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
Assert.assertTrue(
showPipeResult
.get(0)
@@ -189,6 +267,80 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// Check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
// Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeProcessor
+ .contains("processor=tumbling-time-sampling-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // Check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // Check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
+ // Alter pipe (modify empty)
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify source ()");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeProcessor
+ .contains("processor=tumbling-time-sampling-processor"));
+ Assert.assertTrue(
+ showPipeResult
+ .get(0)
+ .pipeConnector
+ .contains(String.format("node-urls=%s",
receiverDataNode.getIpAndPortString())));
+ // Check creation time and record last creation time
+ Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+ lastCreationTime = showPipeResult.get(0).creationTime;
+ // Check exception message
+ Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+ }
+
+ // Alter pipe (replace empty)
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b replace source ()");
+ } catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ // show pipe
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TShowPipeInfo> showPipeResult = client.showPipe(new
TShowPipeReq()).pipeInfoList;
+ Assert.assertEquals(1, showPipeResult.size());
+ // check status
+ Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+ // check configurations
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
Assert.assertTrue(
showPipeResult
@@ -223,6 +375,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
// check configurations
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
Assert.assertFalse(
showPipeResult
@@ -305,13 +459,13 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
}
@Test
- public void testAlterPipeProcessor() {
+ public void testAlterPipeSourceAndProcessor() {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
// Create pipe
final String sql =
String.format(
- "create pipe a2b with processor
('processor'='tumbling-time-sampling-processor',
'processor.tumbling-time.interval-seconds'='1',
'processor.down-sampling.split-file'='true') with sink ('node-urls'='%s',
'batch.enable'='false')",
+ "create pipe a2b with source ('source' =
'iotdb-source','source.path' = 'root.db.d1.**') with processor
('processor'='tumbling-time-sampling-processor',
'processor.tumbling-time.interval-seconds'='1',
'processor.down-sampling.split-file'='true') with sink ('node-urls'='%s',
'batch.enable'='false')",
receiverDataNode.getIpAndPortString());
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
@@ -335,17 +489,26 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
expectedResSet.add("2000,3.0,");
expectedResSet.add("3000,5.0,");
TestUtils.assertDataEventuallyOnEnv(
- receiverEnv, "select * from root.**", "Time,root.db.d1.at1,",
expectedResSet);
+ receiverEnv, "select * from root.db.**", "Time,root.db.d1.at1,",
expectedResSet);
- // Alter pipe (modify 'processor.tumbling-time.interval-seconds')
+ // Alter pipe (modify 'source.path' and
'processor.tumbling-time.interval-seconds')
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
- "alter pipe a2b modify processor
('processor.tumbling-time.interval-seconds'='2')");
+ "alter pipe a2b modify source('source' =
'iotdb-source','source.path'='root.db.d2.**') modify processor
('processor.tumbling-time.interval-seconds'='2')");
} catch (SQLException e) {
fail(e.getMessage());
}
+ // Insert data on sender
+ if (!TestUtils.tryExecuteNonQueriesWithRetry(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d2 (time, at1) values (11000, 1), (11500, 2),
(12000, 3), (12500, 4), (13000, 5)",
+ "flush"))) {
+ fail();
+ }
+
// Insert data on sender
if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
@@ -357,12 +520,12 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// Check data on receiver
expectedResSet.clear();
- expectedResSet.add("11000,1.0,");
- expectedResSet.add("13000,5.0,");
+ expectedResSet.add("11000,null,1.0,");
+ expectedResSet.add("13000,null,5.0,");
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
- "select * from root.** where time > 10000",
- "Time,root.db.d1.at1,",
+ "select * from root.db.** where time > 10000",
+ "Time,root.db.d1.at1,root.db.d2.at1,",
expectedResSet);
}
}
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 6b662cbe50e..e66c875c9a7 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -576,10 +576,18 @@ connectorAttributeClause
alterPipe
: ALTER PIPE pipeName=identifier
+ alterExtractorAttributesClause?
alterProcessorAttributesClause?
alterConnectorAttributesClause?
;
+alterExtractorAttributesClause
+ : (MODIFY | REPLACE) (EXTRACTOR | SOURCE)
+ LR_BRACKET
+ (extractorAttributeClause COMMA)* extractorAttributeClause?
+ RR_BRACKET
+ ;
+
alterProcessorAttributesClause
: (MODIFY | REPLACE) PROCESSOR
LR_BRACKET
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index ee864076a5b..c5a264c047a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -213,6 +213,20 @@ public class PipeTaskInfo implements SnapshotProcessor {
// 1.1. if they are empty, the original attributes are filled directly.
// 1.2. Otherwise, corresponding updates on original attributes are
performed.
// 2. In replace mode, do nothing here.
+ if (!alterPipeRequest.isReplaceAllExtractorAttributes) { // modify mode
+ if (alterPipeRequest.getExtractorAttributes().isEmpty()) {
+ alterPipeRequest.setExtractorAttributes(
+
copiedPipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute());
+ } else {
+ alterPipeRequest.setExtractorAttributes(
+ copiedPipeStaticMetaFromCoordinator
+ .getExtractorParameters()
+ .addOrReplaceEquivalentAttributes(
+ new
PipeParameters(alterPipeRequest.getExtractorAttributes()))
+ .getAttribute());
+ }
+ }
+
if (!alterPipeRequest.isReplaceAllProcessorAttributes) { // modify mode
if (alterPipeRequest.getProcessorAttributes().isEmpty()) {
alterPipeRequest.setProcessorAttributes(
@@ -226,6 +240,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
.getAttribute());
}
}
+
if (!alterPipeRequest.isReplaceAllConnectorAttributes) { // modify mode
if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
alterPipeRequest.setConnectorAttributes(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 6cccbbd7033..e59eb3a1896 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import
org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2;
import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
@@ -62,13 +63,25 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
private PipeRuntimeMeta currentPipeRuntimeMeta;
private PipeRuntimeMeta updatedPipeRuntimeMeta;
- public AlterPipeProcedureV2() {
+ private ProcedureType procedureType;
+
+ public AlterPipeProcedureV2(ProcedureType procedureType) {
super();
+ this.procedureType = procedureType;
}
public AlterPipeProcedureV2(TAlterPipeReq alterPipeRequest) throws
PipeException {
super();
this.alterPipeRequest = alterPipeRequest;
+ procedureType = ProcedureType.ALTER_PIPE_PROCEDURE_V3;
+ }
+
+ @TestOnly
+ public AlterPipeProcedureV2(TAlterPipeReq alterPipeRequest, ProcedureType
procedureType)
+ throws PipeException {
+ super();
+ this.alterPipeRequest = alterPipeRequest;
+ this.procedureType = procedureType;
}
@Override
@@ -90,7 +103,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
.getPipePluginCoordinator()
.getPipePluginInfo()
.checkPipePluginExistence(
- new HashMap<>(), // no need to check pipe source plugin
+ alterPipeRequest.getExtractorAttributes(),
alterPipeRequest.getProcessorAttributes(),
alterPipeRequest.getConnectorAttributes());
@@ -116,10 +129,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
new PipeStaticMeta(
alterPipeRequest.getPipeName(),
System.currentTimeMillis(),
- new HashMap<>(
- currentPipeStaticMeta
- .getExtractorParameters()
- .getAttribute()), // reuse pipe source plugin
+ new HashMap<>(alterPipeRequest.getExtractorAttributes()),
new HashMap<>(alterPipeRequest.getProcessorAttributes()),
new HashMap<>(alterPipeRequest.getConnectorAttributes()));
@@ -259,7 +269,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
@Override
public void serialize(DataOutputStream stream) throws IOException {
- stream.writeShort(ProcedureType.ALTER_PIPE_PROCEDURE_V2.getTypeCode());
+ stream.writeShort(procedureType.getTypeCode());
super.serialize(stream);
ReadWriteIOUtils.write(alterPipeRequest.getPipeName(), stream);
ReadWriteIOUtils.write(alterPipeRequest.getProcessorAttributesSize(),
stream);
@@ -298,6 +308,15 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
} else {
ReadWriteIOUtils.write(false, stream);
}
+
+ if (procedureType.getTypeCode() ==
ProcedureType.ALTER_PIPE_PROCEDURE_V3.getTypeCode()) {
+ ReadWriteIOUtils.write(alterPipeRequest.getExtractorAttributesSize(),
stream);
+ for (Map.Entry<String, String> entry :
alterPipeRequest.getExtractorAttributes().entrySet()) {
+ ReadWriteIOUtils.write(entry.getKey(), stream);
+ ReadWriteIOUtils.write(entry.getValue(), stream);
+ }
+ ReadWriteIOUtils.write(alterPipeRequest.isReplaceAllExtractorAttributes,
stream);
+ }
}
@Override
@@ -306,8 +325,10 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
alterPipeRequest =
new TAlterPipeReq()
.setPipeName(ReadWriteIOUtils.readString(byteBuffer))
+ .setExtractorAttributes(new HashMap<>())
.setProcessorAttributes(new HashMap<>())
.setConnectorAttributes(new HashMap<>());
+
int size = ReadWriteIOUtils.readInt(byteBuffer);
for (int i = 0; i < size; ++i) {
alterPipeRequest
@@ -334,6 +355,18 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
if (ReadWriteIOUtils.readBool(byteBuffer)) {
updatedPipeRuntimeMeta = PipeRuntimeMeta.deserialize(byteBuffer);
}
+ if (procedureType.getTypeCode() ==
ProcedureType.ALTER_PIPE_PROCEDURE_V3.getTypeCode()) {
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < size; ++i) {
+ alterPipeRequest
+ .getExtractorAttributes()
+ .put(ReadWriteIOUtils.readString(byteBuffer),
ReadWriteIOUtils.readString(byteBuffer));
+ }
+ alterPipeRequest.isReplaceAllExtractorAttributes =
ReadWriteIOUtils.readBool((byteBuffer));
+ } else {
+ alterPipeRequest.setExtractorAttributes(new HashMap<>());
+ alterPipeRequest.isReplaceAllExtractorAttributes = false;
+ }
}
@Override
@@ -346,6 +379,10 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
}
AlterPipeProcedureV2 that = (AlterPipeProcedureV2) o;
return
this.alterPipeRequest.getPipeName().equals(that.alterPipeRequest.getPipeName())
+ && this.alterPipeRequest
+ .getExtractorAttributes()
+ .toString()
+ .equals(that.alterPipeRequest.getExtractorAttributes().toString())
&& this.alterPipeRequest
.getProcessorAttributes()
.toString()
@@ -360,6 +397,7 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
public int hashCode() {
return Objects.hash(
alterPipeRequest.getPipeName(),
+ alterPipeRequest.getExtractorAttributes(),
alterPipeRequest.getProcessorAttributes(),
alterPipeRequest.getConnectorAttributes());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 804f6dcc2a1..4a7c9d008cf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -153,7 +153,10 @@ public class ProcedureFactory implements IProcedureFactory
{
procedure = new DropPipeProcedureV2();
break;
case ALTER_PIPE_PROCEDURE_V2:
- procedure = new AlterPipeProcedureV2();
+ procedure = new
AlterPipeProcedureV2(ProcedureType.ALTER_PIPE_PROCEDURE_V2);
+ break;
+ case ALTER_PIPE_PROCEDURE_V3:
+ procedure = new
AlterPipeProcedureV2(ProcedureType.ALTER_PIPE_PROCEDURE_V3);
break;
case PIPE_HANDLE_LEADER_CHANGE_PROCEDURE:
procedure = new PipeHandleLeaderChangeProcedure();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index a781b54fc8a..683365b1dfc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -74,6 +74,7 @@ public enum ProcedureType {
STOP_PIPE_PROCEDURE_V2((short) 1002),
DROP_PIPE_PROCEDURE_V2((short) 1003),
ALTER_PIPE_PROCEDURE_V2((short) 1004),
+ ALTER_PIPE_PROCEDURE_V3((short) 1005),
/** Pipe Runtime */
PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100),
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
index 1d53a1a72b5..cf51ad98780 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
import org.apache.tsfile.utils.PublicBAOS;
@@ -39,16 +40,21 @@ public class AlterPipeProcedureV2Test {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+ Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();
+
processorAttributes.put("processor", "do-nothing-processor");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");
+ TAlterPipeReq req =
+ new TAlterPipeReq("testPipe", processorAttributes,
connectorAttributes, false, true);
+ req.setExtractorAttributes(extractorAttributes);
+ req.setIsReplaceAllExtractorAttributes(false);
AlterPipeProcedureV2 proc =
- new AlterPipeProcedureV2(
- new TAlterPipeReq("testPipe", processorAttributes,
connectorAttributes, false, true));
+ new AlterPipeProcedureV2(req, ProcedureType.ALTER_PIPE_PROCEDURE_V2);
try {
proc.serialize(outputStream);
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV3Test.java
similarity index 78%
copy from
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
copy to
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV3Test.java
index 1d53a1a72b5..3e1c460f855 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV3Test.java
@@ -33,31 +33,38 @@ import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-public class AlterPipeProcedureV2Test {
+public class AlterPipeProcedureV3Test {
+
@Test
public void serializeDeserializeTest() {
PublicBAOS byteArrayOutputStream = new PublicBAOS();
DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream);
+ Map<String, String> extractorAttributes = new HashMap<>();
Map<String, String> processorAttributes = new HashMap<>();
Map<String, String> connectorAttributes = new HashMap<>();
+
+ extractorAttributes.put("source", "iotdb-source");
+ extractorAttributes.put("source.pattern", "root.test1.wf01");
processorAttributes.put("processor", "do-nothing-processor");
connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("host", "127.0.0.1");
connectorAttributes.put("port", "6667");
- AlterPipeProcedureV2 proc =
- new AlterPipeProcedureV2(
- new TAlterPipeReq("testPipe", processorAttributes,
connectorAttributes, false, true));
+ TAlterPipeReq req =
+ new TAlterPipeReq("testPipe", processorAttributes,
connectorAttributes, false, true);
+ req.setExtractorAttributes(extractorAttributes);
+ req.setIsReplaceAllExtractorAttributes(false);
+ AlterPipeProcedureV2 proc = new AlterPipeProcedureV2(req);
try {
proc.serialize(outputStream);
ByteBuffer buffer =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
- AlterPipeProcedureV2 proc2 =
+ AlterPipeProcedureV2 proc3 =
(AlterPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
- assertEquals(proc, proc2);
+ assertEquals(proc, proc3);
} catch (Exception e) {
fail();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index dffec9f9dce..e655d1fc71d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1789,6 +1789,10 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
// Validate pipe plugin before alteration - only validate replace mode
final String pipeName = alterPipeStatement.getPipeName();
try {
+ if (!alterPipeStatement.getExtractorAttributes().isEmpty()
+ && alterPipeStatement.isReplaceAllExtractorAttributes()) {
+
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
+ }
if (!alterPipeStatement.getProcessorAttributes().isEmpty()
&& alterPipeStatement.isReplaceAllProcessorAttributes()) {
PipeDataNodeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
@@ -1814,6 +1818,8 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
alterPipeStatement.getConnectorAttributes(),
alterPipeStatement.isReplaceAllProcessorAttributes(),
alterPipeStatement.isReplaceAllConnectorAttributes());
+ req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes());
+
req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes());
final TSStatus tsStatus = configNodeClient.alterPipe(req);
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
LOGGER.warn("Failed to alter pipe {} in config node, status is {}.",
pipeName, tsStatus);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index f0efaa2fc4a..dfc2ee7964b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3730,6 +3730,18 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
throw new SemanticException(
"Not support for this sql in ALTER PIPE, please enter pipe name.");
}
+
+ if (ctx.alterExtractorAttributesClause() != null) {
+ alterPipeStatement.setExtractorAttributes(
+ parseExtractorAttributesClause(
+
ctx.alterExtractorAttributesClause().extractorAttributeClause()));
+ alterPipeStatement.setReplaceAllExtractorAttributes(
+ Objects.nonNull(ctx.alterExtractorAttributesClause().REPLACE()));
+ } else {
+ alterPipeStatement.setExtractorAttributes(new HashMap<>());
+ alterPipeStatement.setReplaceAllExtractorAttributes(false);
+ }
+
if (ctx.alterProcessorAttributesClause() != null) {
alterPipeStatement.setProcessorAttributes(
parseProcessorAttributesClause(
@@ -3740,6 +3752,7 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
alterPipeStatement.setProcessorAttributes(new HashMap<>());
alterPipeStatement.setReplaceAllProcessorAttributes(false);
}
+
if (ctx.alterConnectorAttributesClause() != null) {
alterPipeStatement.setConnectorAttributes(
parseConnectorAttributesClause(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 644d26ba0bb..0c0bef6c274 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -37,8 +37,10 @@ import java.util.Map;
public class AlterPipeStatement extends Statement implements IConfigStatement {
private String pipeName;
+ private Map<String, String> extractorAttributes;
private Map<String, String> processorAttributes;
private Map<String, String> connectorAttributes;
+ private boolean isReplaceAllExtractorAttributes;
private boolean isReplaceAllProcessorAttributes;
private boolean isReplaceAllConnectorAttributes;
@@ -50,6 +52,10 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
return pipeName;
}
+ public Map<String, String> getExtractorAttributes() {
+ return extractorAttributes;
+ }
+
public Map<String, String> getProcessorAttributes() {
return processorAttributes;
}
@@ -58,6 +64,10 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
return connectorAttributes;
}
+ public boolean isReplaceAllExtractorAttributes() {
+ return isReplaceAllExtractorAttributes;
+ }
+
public boolean isReplaceAllProcessorAttributes() {
return isReplaceAllProcessorAttributes;
}
@@ -70,6 +80,10 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
this.pipeName = pipeName;
}
+ public void setExtractorAttributes(Map<String, String> extractorAttributes) {
+ this.extractorAttributes = extractorAttributes;
+ }
+
public void setProcessorAttributes(Map<String, String> processorAttributes) {
this.processorAttributes = processorAttributes;
}
@@ -78,6 +92,10 @@ public class AlterPipeStatement extends Statement implements
IConfigStatement {
this.connectorAttributes = connectorAttributes;
}
+ public void setReplaceAllExtractorAttributes(boolean
replaceAllExtractorAttributes) {
+ isReplaceAllExtractorAttributes = replaceAllExtractorAttributes;
+ }
+
public void setReplaceAllProcessorAttributes(boolean
replaceAllProcessorAttributes) {
isReplaceAllProcessorAttributes = replaceAllProcessorAttributes;
}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 73663b5789e..4dcce1c49ff 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -717,6 +717,8 @@ struct TAlterPipeReq {
3: required map<string, string> connectorAttributes
4: required bool isReplaceAllProcessorAttributes
5: required bool isReplaceAllConnectorAttributes
+ 6: optional map<string, string> extractorAttributes
+ 7: optional bool isReplaceAllExtractorAttributes
}
// Deprecated, restored for compatibility