This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 71a4ecb642f Pipe: support alter pipe source (#12932)
71a4ecb642f is described below

commit 71a4ecb642f3f292a33ef3d9a5a8f91bf01d47f0
Author: Zhenyu Luo <[email protected]>
AuthorDate: Wed Jul 17 19:31:50 2024 +0800

    Pipe: support alter pipe source (#12932)
---
 .../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 183 +++++++++++++++++++--
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |   8 +
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  15 ++
 .../impl/pipe/task/AlterPipeProcedureV2.java       |  52 +++++-
 .../procedure/store/ProcedureFactory.java          |   5 +-
 .../confignode/procedure/store/ProcedureType.java  |   1 +
 .../impl/pipe/task/AlterPipeProcedureV2Test.java   |  10 +-
 ...reV2Test.java => AlterPipeProcedureV3Test.java} |  19 ++-
 .../config/executor/ClusterConfigTaskExecutor.java |   6 +
 .../db/queryengine/plan/parser/ASTVisitor.java     |  13 ++
 .../metadata/pipe/AlterPipeStatement.java          |  18 ++
 .../src/main/thrift/confignode.thrift              |   2 +
 12 files changed, 306 insertions(+), 26 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index 2de8454eb51..fb541e7dcdb 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -53,7 +53,7 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualAutoIT {
     // Create pipe
     final String sql =
         String.format(
-            "create pipe a2b with processor 
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
+            "create pipe a2b with source ('source'='iotdb-source', 
'source.pattern'='root.test1', 'source.realtime.mode'='stream') with processor 
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
             receiverDataNode.getIpAndPortString());
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
@@ -71,6 +71,10 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualAutoIT 
{
       // Check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
       // Check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test1"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
       Assert.assertTrue(
           
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
       Assert.assertTrue(
@@ -99,6 +103,76 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
     }
 
+    // Alter pipe (modify)
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b modify source 
('source.pattern'='root.test2')");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // Show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // Check status
+      Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
+      // Check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test2"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // Check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // Check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // Alter pipe (replace)
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          "alter pipe a2b replace source ('source'='iotdb-source', 
'source.path'='root.test1.**')");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // Show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // check status
+      Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
+      // check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test2"));
+      Assert.assertFalse(
+          
showPipeResult.get(0).pipeExtractor.contains("source.realtime.mode=stream"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // Check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
     // Alter pipe (modify)
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
@@ -115,6 +189,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       // Check status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
       // Check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
       Assert.assertTrue(
           
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
@@ -155,6 +231,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       // check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
       // check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
       Assert.assertTrue(
           showPipeResult
               .get(0)
@@ -189,6 +267,80 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       // Check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
       // Check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeProcessor
+              .contains("processor=tumbling-time-sampling-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // Check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // Check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // Alter pipe (modify empty)
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b modify source ()");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeProcessor
+              .contains("processor=tumbling-time-sampling-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // Check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // Check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // Alter pipe (replace empty)
+    try (final Connection connection = senderEnv.getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b replace source ()");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (final SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      final List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // check configurations
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
       Assert.assertTrue(
           showPipeResult
@@ -223,6 +375,8 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
       // check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
       // check configurations
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
+      
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source.path=root.test1.**"));
       
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
       Assert.assertFalse(
           showPipeResult
@@ -305,13 +459,13 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
   }
 
   @Test
-  public void testAlterPipeProcessor() {
+  public void testAlterPipeSourceAndProcessor() {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     // Create pipe
     final String sql =
         String.format(
-            "create pipe a2b with processor 
('processor'='tumbling-time-sampling-processor', 
'processor.tumbling-time.interval-seconds'='1', 
'processor.down-sampling.split-file'='true') with sink ('node-urls'='%s', 
'batch.enable'='false')",
+            "create pipe a2b with source ('source' = 
'iotdb-source','source.path' = 'root.db.d1.**') with processor 
('processor'='tumbling-time-sampling-processor', 
'processor.tumbling-time.interval-seconds'='1', 
'processor.down-sampling.split-file'='true') with sink ('node-urls'='%s', 
'batch.enable'='false')",
             receiverDataNode.getIpAndPortString());
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
@@ -335,17 +489,26 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
     expectedResSet.add("2000,3.0,");
     expectedResSet.add("3000,5.0,");
     TestUtils.assertDataEventuallyOnEnv(
-        receiverEnv, "select * from root.**", "Time,root.db.d1.at1,", 
expectedResSet);
+        receiverEnv, "select * from root.db.**", "Time,root.db.d1.at1,", 
expectedResSet);
 
-    // Alter pipe (modify 'processor.tumbling-time.interval-seconds')
+    // Alter pipe (modify 'source.path' and 
'processor.tumbling-time.interval-seconds')
     try (final Connection connection = senderEnv.getConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute(
-          "alter pipe a2b modify processor 
('processor.tumbling-time.interval-seconds'='2')");
+          "alter pipe a2b modify source('source' = 
'iotdb-source','source.path'='root.db.d2.**') modify processor 
('processor.tumbling-time.interval-seconds'='2')");
     } catch (SQLException e) {
       fail(e.getMessage());
     }
 
+    // Insert data on sender
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        senderEnv,
+        Arrays.asList(
+            "insert into root.db.d2 (time, at1) values (11000, 1), (11500, 2), 
(12000, 3), (12500, 4), (13000, 5)",
+            "flush"))) {
+      fail();
+    }
+
     // Insert data on sender
     if (!TestUtils.tryExecuteNonQueriesWithRetry(
         senderEnv,
@@ -357,12 +520,12 @@ public class IoTDBPipeAlterIT extends 
AbstractPipeDualAutoIT {
 
     // Check data on receiver
     expectedResSet.clear();
-    expectedResSet.add("11000,1.0,");
-    expectedResSet.add("13000,5.0,");
+    expectedResSet.add("11000,null,1.0,");
+    expectedResSet.add("13000,null,5.0,");
     TestUtils.assertDataEventuallyOnEnv(
         receiverEnv,
-        "select * from root.** where time > 10000",
-        "Time,root.db.d1.at1,",
+        "select * from root.db.** where time > 10000",
+        "Time,root.db.d1.at1,root.db.d2.at1,",
         expectedResSet);
   }
 }
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 6b662cbe50e..e66c875c9a7 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -576,10 +576,18 @@ connectorAttributeClause
 
 alterPipe
     : ALTER PIPE pipeName=identifier
+        alterExtractorAttributesClause?
         alterProcessorAttributesClause?
         alterConnectorAttributesClause?
     ;
 
+alterExtractorAttributesClause
+    : (MODIFY | REPLACE) (EXTRACTOR | SOURCE)
+        LR_BRACKET
+        (extractorAttributeClause COMMA)* extractorAttributeClause?
+        RR_BRACKET
+    ;
+
 alterProcessorAttributesClause
     : (MODIFY | REPLACE) PROCESSOR
         LR_BRACKET
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index ee864076a5b..c5a264c047a 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -213,6 +213,20 @@ public class PipeTaskInfo implements SnapshotProcessor {
     //   1.1. if they are empty, the original attributes are filled directly.
     //   1.2. Otherwise, corresponding updates on original attributes are 
performed.
     // 2. In replace mode, do nothing here.
+    if (!alterPipeRequest.isReplaceAllExtractorAttributes) { // modify mode
+      if (alterPipeRequest.getExtractorAttributes().isEmpty()) {
+        alterPipeRequest.setExtractorAttributes(
+            
copiedPipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute());
+      } else {
+        alterPipeRequest.setExtractorAttributes(
+            copiedPipeStaticMetaFromCoordinator
+                .getExtractorParameters()
+                .addOrReplaceEquivalentAttributes(
+                    new 
PipeParameters(alterPipeRequest.getExtractorAttributes()))
+                .getAttribute());
+      }
+    }
+
     if (!alterPipeRequest.isReplaceAllProcessorAttributes) { // modify mode
       if (alterPipeRequest.getProcessorAttributes().isEmpty()) {
         alterPipeRequest.setProcessorAttributes(
@@ -226,6 +240,7 @@ public class PipeTaskInfo implements SnapshotProcessor {
                 .getAttribute());
       }
     }
+
     if (!alterPipeRequest.isReplaceAllConnectorAttributes) { // modify mode
       if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
         alterPipeRequest.setConnectorAttributes(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 6cccbbd7033..e59eb3a1896 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.task.meta.PipeStaticMeta;
 import org.apache.iotdb.commons.pipe.task.meta.PipeStatus;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import 
org.apache.iotdb.confignode.consensus.request.write.pipe.task.AlterPipePlanV2;
 import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
@@ -62,13 +63,25 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   private PipeRuntimeMeta currentPipeRuntimeMeta;
   private PipeRuntimeMeta updatedPipeRuntimeMeta;
 
-  public AlterPipeProcedureV2() {
+  private ProcedureType procedureType;
+
+  public AlterPipeProcedureV2(ProcedureType procedureType) {
     super();
+    this.procedureType = procedureType;
   }
 
   public AlterPipeProcedureV2(TAlterPipeReq alterPipeRequest) throws 
PipeException {
     super();
     this.alterPipeRequest = alterPipeRequest;
+    procedureType = ProcedureType.ALTER_PIPE_PROCEDURE_V3;
+  }
+
+  @TestOnly
+  public AlterPipeProcedureV2(TAlterPipeReq alterPipeRequest, ProcedureType 
procedureType)
+      throws PipeException {
+    super();
+    this.alterPipeRequest = alterPipeRequest;
+    this.procedureType = procedureType;
   }
 
   @Override
@@ -90,7 +103,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         .getPipePluginCoordinator()
         .getPipePluginInfo()
         .checkPipePluginExistence(
-            new HashMap<>(), // no need to check pipe source plugin
+            alterPipeRequest.getExtractorAttributes(),
             alterPipeRequest.getProcessorAttributes(),
             alterPipeRequest.getConnectorAttributes());
 
@@ -116,10 +129,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         new PipeStaticMeta(
             alterPipeRequest.getPipeName(),
             System.currentTimeMillis(),
-            new HashMap<>(
-                currentPipeStaticMeta
-                    .getExtractorParameters()
-                    .getAttribute()), // reuse pipe source plugin
+            new HashMap<>(alterPipeRequest.getExtractorAttributes()),
             new HashMap<>(alterPipeRequest.getProcessorAttributes()),
             new HashMap<>(alterPipeRequest.getConnectorAttributes()));
 
@@ -259,7 +269,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
 
   @Override
   public void serialize(DataOutputStream stream) throws IOException {
-    stream.writeShort(ProcedureType.ALTER_PIPE_PROCEDURE_V2.getTypeCode());
+    stream.writeShort(procedureType.getTypeCode());
     super.serialize(stream);
     ReadWriteIOUtils.write(alterPipeRequest.getPipeName(), stream);
     ReadWriteIOUtils.write(alterPipeRequest.getProcessorAttributesSize(), 
stream);
@@ -298,6 +308,15 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     } else {
       ReadWriteIOUtils.write(false, stream);
     }
+
+    if (procedureType.getTypeCode() == 
ProcedureType.ALTER_PIPE_PROCEDURE_V3.getTypeCode()) {
+      ReadWriteIOUtils.write(alterPipeRequest.getExtractorAttributesSize(), 
stream);
+      for (Map.Entry<String, String> entry : 
alterPipeRequest.getExtractorAttributes().entrySet()) {
+        ReadWriteIOUtils.write(entry.getKey(), stream);
+        ReadWriteIOUtils.write(entry.getValue(), stream);
+      }
+      ReadWriteIOUtils.write(alterPipeRequest.isReplaceAllExtractorAttributes, 
stream);
+    }
   }
 
   @Override
@@ -306,8 +325,10 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     alterPipeRequest =
         new TAlterPipeReq()
             .setPipeName(ReadWriteIOUtils.readString(byteBuffer))
+            .setExtractorAttributes(new HashMap<>())
             .setProcessorAttributes(new HashMap<>())
             .setConnectorAttributes(new HashMap<>());
+
     int size = ReadWriteIOUtils.readInt(byteBuffer);
     for (int i = 0; i < size; ++i) {
       alterPipeRequest
@@ -334,6 +355,18 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     if (ReadWriteIOUtils.readBool(byteBuffer)) {
       updatedPipeRuntimeMeta = PipeRuntimeMeta.deserialize(byteBuffer);
     }
+    if (procedureType.getTypeCode() == 
ProcedureType.ALTER_PIPE_PROCEDURE_V3.getTypeCode()) {
+      size = ReadWriteIOUtils.readInt(byteBuffer);
+      for (int i = 0; i < size; ++i) {
+        alterPipeRequest
+            .getExtractorAttributes()
+            .put(ReadWriteIOUtils.readString(byteBuffer), 
ReadWriteIOUtils.readString(byteBuffer));
+      }
+      alterPipeRequest.isReplaceAllExtractorAttributes = 
ReadWriteIOUtils.readBool((byteBuffer));
+    } else {
+      alterPipeRequest.setExtractorAttributes(new HashMap<>());
+      alterPipeRequest.isReplaceAllExtractorAttributes = false;
+    }
   }
 
   @Override
@@ -346,6 +379,10 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     }
     AlterPipeProcedureV2 that = (AlterPipeProcedureV2) o;
     return 
this.alterPipeRequest.getPipeName().equals(that.alterPipeRequest.getPipeName())
+        && this.alterPipeRequest
+            .getExtractorAttributes()
+            .toString()
+            .equals(that.alterPipeRequest.getExtractorAttributes().toString())
         && this.alterPipeRequest
             .getProcessorAttributes()
             .toString()
@@ -360,6 +397,7 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
   public int hashCode() {
     return Objects.hash(
         alterPipeRequest.getPipeName(),
+        alterPipeRequest.getExtractorAttributes(),
         alterPipeRequest.getProcessorAttributes(),
         alterPipeRequest.getConnectorAttributes());
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index 804f6dcc2a1..4a7c9d008cf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -153,7 +153,10 @@ public class ProcedureFactory implements IProcedureFactory 
{
         procedure = new DropPipeProcedureV2();
         break;
       case ALTER_PIPE_PROCEDURE_V2:
-        procedure = new AlterPipeProcedureV2();
+        procedure = new 
AlterPipeProcedureV2(ProcedureType.ALTER_PIPE_PROCEDURE_V2);
+        break;
+      case ALTER_PIPE_PROCEDURE_V3:
+        procedure = new 
AlterPipeProcedureV2(ProcedureType.ALTER_PIPE_PROCEDURE_V3);
         break;
       case PIPE_HANDLE_LEADER_CHANGE_PROCEDURE:
         procedure = new PipeHandleLeaderChangeProcedure();
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index a781b54fc8a..683365b1dfc 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -74,6 +74,7 @@ public enum ProcedureType {
   STOP_PIPE_PROCEDURE_V2((short) 1002),
   DROP_PIPE_PROCEDURE_V2((short) 1003),
   ALTER_PIPE_PROCEDURE_V2((short) 1004),
+  ALTER_PIPE_PROCEDURE_V3((short) 1005),
 
   /** Pipe Runtime */
   PIPE_HANDLE_LEADER_CHANGE_PROCEDURE((short) 1100),
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
index 1d53a1a72b5..cf51ad98780 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.impl.pipe.task;
 
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
 
 import org.apache.tsfile.utils.PublicBAOS;
@@ -39,16 +40,21 @@ public class AlterPipeProcedureV2Test {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
 
+    Map<String, String> extractorAttributes = new HashMap<>();
     Map<String, String> processorAttributes = new HashMap<>();
     Map<String, String> connectorAttributes = new HashMap<>();
+
     processorAttributes.put("processor", "do-nothing-processor");
     connectorAttributes.put("connector", "iotdb-thrift-connector");
     connectorAttributes.put("host", "127.0.0.1");
     connectorAttributes.put("port", "6667");
 
+    TAlterPipeReq req =
+        new TAlterPipeReq("testPipe", processorAttributes, 
connectorAttributes, false, true);
+    req.setExtractorAttributes(extractorAttributes);
+    req.setIsReplaceAllExtractorAttributes(false);
     AlterPipeProcedureV2 proc =
-        new AlterPipeProcedureV2(
-            new TAlterPipeReq("testPipe", processorAttributes, 
connectorAttributes, false, true));
+        new AlterPipeProcedureV2(req, ProcedureType.ALTER_PIPE_PROCEDURE_V2);
 
     try {
       proc.serialize(outputStream);
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV3Test.java
similarity index 78%
copy from 
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
copy to 
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV3Test.java
index 1d53a1a72b5..3e1c460f855 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV3Test.java
@@ -33,31 +33,38 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-public class AlterPipeProcedureV2Test {
+public class AlterPipeProcedureV3Test {
+
   @Test
   public void serializeDeserializeTest() {
     PublicBAOS byteArrayOutputStream = new PublicBAOS();
     DataOutputStream outputStream = new 
DataOutputStream(byteArrayOutputStream);
 
+    Map<String, String> extractorAttributes = new HashMap<>();
     Map<String, String> processorAttributes = new HashMap<>();
     Map<String, String> connectorAttributes = new HashMap<>();
+
+    extractorAttributes.put("source", "iotdb-source");
+    extractorAttributes.put("source.pattern", "root.test1.wf01");
     processorAttributes.put("processor", "do-nothing-processor");
     connectorAttributes.put("connector", "iotdb-thrift-connector");
     connectorAttributes.put("host", "127.0.0.1");
     connectorAttributes.put("port", "6667");
 
-    AlterPipeProcedureV2 proc =
-        new AlterPipeProcedureV2(
-            new TAlterPipeReq("testPipe", processorAttributes, 
connectorAttributes, false, true));
+    TAlterPipeReq req =
+        new TAlterPipeReq("testPipe", processorAttributes, 
connectorAttributes, false, true);
+    req.setExtractorAttributes(extractorAttributes);
+    req.setIsReplaceAllExtractorAttributes(false);
+    AlterPipeProcedureV2 proc = new AlterPipeProcedureV2(req);
 
     try {
       proc.serialize(outputStream);
       ByteBuffer buffer =
           ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, 
byteArrayOutputStream.size());
-      AlterPipeProcedureV2 proc2 =
+      AlterPipeProcedureV2 proc3 =
           (AlterPipeProcedureV2) ProcedureFactory.getInstance().create(buffer);
 
-      assertEquals(proc, proc2);
+      assertEquals(proc, proc3);
     } catch (Exception e) {
       fail();
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index dffec9f9dce..e655d1fc71d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1789,6 +1789,10 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     // Validate pipe plugin before alteration - only validate replace mode
     final String pipeName = alterPipeStatement.getPipeName();
     try {
+      if (!alterPipeStatement.getExtractorAttributes().isEmpty()
+          && alterPipeStatement.isReplaceAllExtractorAttributes()) {
+        
PipeDataNodeAgent.plugin().validateExtractor(alterPipeStatement.getExtractorAttributes());
+      }
       if (!alterPipeStatement.getProcessorAttributes().isEmpty()
           && alterPipeStatement.isReplaceAllProcessorAttributes()) {
         
PipeDataNodeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
@@ -1814,6 +1818,8 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
               alterPipeStatement.getConnectorAttributes(),
               alterPipeStatement.isReplaceAllProcessorAttributes(),
               alterPipeStatement.isReplaceAllConnectorAttributes());
+      req.setExtractorAttributes(alterPipeStatement.getExtractorAttributes());
+      
req.setIsReplaceAllExtractorAttributes(alterPipeStatement.isReplaceAllExtractorAttributes());
       final TSStatus tsStatus = configNodeClient.alterPipe(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
         LOGGER.warn("Failed to alter pipe {} in config node, status is {}.", 
pipeName, tsStatus);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index f0efaa2fc4a..dfc2ee7964b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -3730,6 +3730,18 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       throw new SemanticException(
           "Not support for this sql in ALTER PIPE, please enter pipe name.");
     }
+
+    if (ctx.alterExtractorAttributesClause() != null) {
+      alterPipeStatement.setExtractorAttributes(
+          parseExtractorAttributesClause(
+              
ctx.alterExtractorAttributesClause().extractorAttributeClause()));
+      alterPipeStatement.setReplaceAllExtractorAttributes(
+          Objects.nonNull(ctx.alterExtractorAttributesClause().REPLACE()));
+    } else {
+      alterPipeStatement.setExtractorAttributes(new HashMap<>());
+      alterPipeStatement.setReplaceAllExtractorAttributes(false);
+    }
+
     if (ctx.alterProcessorAttributesClause() != null) {
       alterPipeStatement.setProcessorAttributes(
           parseProcessorAttributesClause(
@@ -3740,6 +3752,7 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       alterPipeStatement.setProcessorAttributes(new HashMap<>());
       alterPipeStatement.setReplaceAllProcessorAttributes(false);
     }
+
     if (ctx.alterConnectorAttributesClause() != null) {
       alterPipeStatement.setConnectorAttributes(
           parseConnectorAttributesClause(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 644d26ba0bb..0c0bef6c274 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -37,8 +37,10 @@ import java.util.Map;
 public class AlterPipeStatement extends Statement implements IConfigStatement {
 
   private String pipeName;
+  private Map<String, String> extractorAttributes;
   private Map<String, String> processorAttributes;
   private Map<String, String> connectorAttributes;
+  private boolean isReplaceAllExtractorAttributes;
   private boolean isReplaceAllProcessorAttributes;
   private boolean isReplaceAllConnectorAttributes;
 
@@ -50,6 +52,10 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     return pipeName;
   }
 
+  public Map<String, String> getExtractorAttributes() {
+    return extractorAttributes;
+  }
+
   public Map<String, String> getProcessorAttributes() {
     return processorAttributes;
   }
@@ -58,6 +64,10 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     return connectorAttributes;
   }
 
+  public boolean isReplaceAllExtractorAttributes() {
+    return isReplaceAllExtractorAttributes;
+  }
+
   public boolean isReplaceAllProcessorAttributes() {
     return isReplaceAllProcessorAttributes;
   }
@@ -70,6 +80,10 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     this.pipeName = pipeName;
   }
 
+  public void setExtractorAttributes(Map<String, String> extractorAttributes) {
+    this.extractorAttributes = extractorAttributes;
+  }
+
   public void setProcessorAttributes(Map<String, String> processorAttributes) {
     this.processorAttributes = processorAttributes;
   }
@@ -78,6 +92,10 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     this.connectorAttributes = connectorAttributes;
   }
 
+  public void setReplaceAllExtractorAttributes(boolean 
replaceAllExtractorAttributes) {
+    isReplaceAllExtractorAttributes = replaceAllExtractorAttributes;
+  }
+
   public void setReplaceAllProcessorAttributes(boolean 
replaceAllProcessorAttributes) {
     isReplaceAllProcessorAttributes = replaceAllProcessorAttributes;
   }
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 73663b5789e..4dcce1c49ff 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -717,6 +717,8 @@ struct TAlterPipeReq {
     3: required map<string, string> connectorAttributes
     4: required bool isReplaceAllProcessorAttributes
     5: required bool isReplaceAllConnectorAttributes
+    6: optional map<string, string> extractorAttributes
+    7: optional bool isReplaceAllExtractorAttributes
 }
 
 // Deprecated, restored for compatibility


Reply via email to