This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fd4337eb0af Pipe: support replace and modify mode for alter pipe sql 
(#12018)
fd4337eb0af is described below

commit fd4337eb0af18000babaaf3f16a6643b520fd3b8
Author: V_Galaxy <[email protected]>
AuthorDate: Tue Feb 6 10:47:59 2024 +0800

    Pipe: support replace and modify mode for alter pipe sql (#12018)
---
 .../org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java | 243 +++++++++++++++++++--
 .../api/customizer/parameter/PipeParameters.java   |  39 ++--
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  12 +-
 .../confignode/persistence/pipe/PipeTaskInfo.java  |  73 ++++---
 .../impl/pipe/task/AlterPipeProcedureV2.java       |  16 +-
 .../impl/pipe/task/AlterPipeProcedureV2Test.java   |   5 +-
 .../config/executor/ClusterConfigTaskExecutor.java |  28 +--
 .../db/queryengine/plan/parser/ASTVisitor.java     |  15 +-
 .../metadata/pipe/AlterPipeStatement.java          |  18 ++
 .../src/main/thrift/confignode.thrift              |   6 +-
 10 files changed, 352 insertions(+), 103 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java
index a5534b32411..dfed53ab9c1 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/IoTDBPipeAlterIT.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.pipe.it;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
+import org.apache.iotdb.db.it.utils.TestUtils;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2;
@@ -34,7 +35,10 @@ import org.junit.runner.RunWith;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.junit.Assert.fail;
 
@@ -43,13 +47,13 @@ import static org.junit.Assert.fail;
 public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
 
   @Test
-  public void testBasicPipeAlter() throws Exception {
+  public void testBasicAlterPipe() throws Exception {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     // create pipe
     String sql =
         String.format(
-            "create pipe a2b with sink ('node-urls'='%s', 
'batch.enable'='false')",
+            "create pipe a2b with processor 
('processor'='do-nothing-processor') with sink ('node-urls'='%s')",
             receiverDataNode.getIpAndPortString());
     try (Connection connection = senderEnv.getConnection();
         Statement statement = connection.createStatement()) {
@@ -58,15 +62,24 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
       fail(e.getMessage());
     }
 
-    // show pipe and record first creation time
-    long firstCreationTime;
+    // show pipe
+    long lastCreationTime;
     try (SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
-      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
+      // check status
       Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
-      firstCreationTime = showPipeResult.get(0).creationTime;
+      // check configurations
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // record last creation time
+      lastCreationTime = showPipeResult.get(0).creationTime;
     }
 
     // stop pipe
@@ -82,17 +95,14 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
+      // check status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
     }
 
-    // alter pipe
-    sql =
-        String.format(
-            "alter pipe a2b modify sink ('node-urls'='%s', 
'batch.enable'='true')",
-            receiverDataNode.getIpAndPortString());
+    // alter pipe (modify)
     try (Connection connection = senderEnv.getConnection();
         Statement statement = connection.createStatement()) {
-      statement.execute(sql);
+      statement.execute("alter pipe a2b modify sink 
('sink.batch.enable'='false')");
     } catch (SQLException e) {
       fail(e.getMessage());
     }
@@ -102,17 +112,159 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT 
{
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
       List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
       Assert.assertEquals(1, showPipeResult.size());
-      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      // check status
       Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
-      // alter pipe will reset pipe creation time
-      Assert.assertTrue(showPipeResult.get(0).creationTime > 
firstCreationTime);
-      // alter pipe will clear exception messages
+      // check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=do-nothing-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // start pipe
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("start pipe a2b");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // alter pipe (replace)
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b replace processor 
('processor'='down-sampling-processor')");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // check configurations
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=false"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // alter pipe (modify)
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b modify sink 
('connector.batch.enable'='true')");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      Assert.assertTrue(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // alter pipe (replace empty)
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b replace processor ()");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      Assert.assertFalse(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // check exception message
+      Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
+    }
+
+    // alter pipe (modify empty)
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b modify sink ()");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // show pipe
+    try (SyncConfigNodeIServiceClient client =
+        (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+      List<TShowPipeInfo> showPipeResult = client.showPipe(new 
TShowPipeReq()).pipeInfoList;
+      Assert.assertEquals(1, showPipeResult.size());
+      // check status
+      Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
+      // check configurations
+      
Assert.assertTrue(showPipeResult.get(0).pipeConnector.contains("batch.enable=true"));
+      Assert.assertFalse(
+          
showPipeResult.get(0).pipeProcessor.contains("processor=down-sampling-processor"));
+      Assert.assertTrue(
+          showPipeResult
+              .get(0)
+              .pipeConnector
+              .contains(String.format("node-urls=%s", 
receiverDataNode.getIpAndPortString())));
+      // check creation time and record last creation time
+      Assert.assertTrue(showPipeResult.get(0).creationTime > lastCreationTime);
+      lastCreationTime = showPipeResult.get(0).creationTime;
+      // check exception message
       Assert.assertEquals("", showPipeResult.get(0).exceptionMessage);
     }
   }
 
   @Test
-  public void testPipeAlterFailure() {
+  public void testAlterPipeFailure() {
     DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
 
     // alter non-existed pipe
@@ -138,17 +290,66 @@ public class IoTDBPipeAlterIT extends AbstractPipeDualIT {
     } catch (SQLException e) {
       fail(e.getMessage());
     }
+  }
 
-    // useless alter
-    sql =
+  @Test
+  public void testAlterPipeProcessor() {
+    DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+    // create pipe
+    String sql =
         String.format(
-            "alter pipe a2b modify sink ('node-urls'='%s', 
'batch.enable'='false')",
+            "create pipe a2b with processor 
('processor'='down-sampling-processor', 'down-sampling.interval-seconds'='1', 
'down-sampling.split-file'='true') with sink ('node-urls'='%s', 
'batch.enable'='false')",
             receiverDataNode.getIpAndPortString());
     try (Connection connection = senderEnv.getConnection();
         Statement statement = connection.createStatement()) {
       statement.execute(sql);
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // insert data on sender
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        senderEnv,
+        Arrays.asList(
+            "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2), 
(2000, 3), (2500, 4), (3000, 5)",
+            "flush"))) {
+      fail();
+    }
+
+    // check data on receiver
+    Set<String> expectedResSet = new HashSet<>();
+    expectedResSet.add("1000,1.0,");
+    expectedResSet.add("2000,3.0,");
+    expectedResSet.add("3000,5.0,");
+    TestUtils.assertDataOnEnv(
+        receiverEnv, "select * from root.**", "Time,root.db.d1.at1,", 
expectedResSet);
+
+    // alter pipe (modify 'down-sampling.interval-seconds')
+    try (Connection connection = senderEnv.getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("alter pipe a2b modify processor 
('down-sampling.interval-seconds'='2')");
+    } catch (SQLException e) {
+      fail(e.getMessage());
+    }
+
+    // insert data on sender
+    if (!TestUtils.tryExecuteNonQueriesWithRetry(
+        senderEnv,
+        Arrays.asList(
+            "insert into root.db.d1 (time, at1) values (11000, 1), (11500, 2), 
(12000, 3), (12500, 4), (13000, 5)",
+            "flush"))) {
       fail();
-    } catch (SQLException ignore) {
     }
+
+    // check data on receiver
+    expectedResSet.clear();
+    expectedResSet.add("11000,1.0,");
+    expectedResSet.add("13000,5.0,");
+    TestUtils.assertDataOnEnv(
+        receiverEnv,
+        "select * from root.** where time > 10000",
+        "Time,root.db.d1.at1,",
+        expectedResSet);
   }
 }
diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index aee06624f7f..3bf0f0bb688 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 /**
@@ -281,30 +280,25 @@ public class PipeParameters {
   }
 
   /**
-   * This method uses {@link KeyReducer} to reduce keys and then sorts them to 
determine if two
-   * PipeParameters are equivalent.
+   * This method adds (non-existed) or replaces (existed) equivalent 
attributes in this
+   * PipeParameters with those from another PipeParameters.
+   *
+   * @param that provide the key that needs to be updated along with the value
+   * @return this pipe parameters
    */
-  public boolean isEquivalent(Object obj) {
-    if (this == obj) {
-      return true;
-    }
-    if (obj == null || getClass() != obj.getClass()) {
-      return false;
-    }
-    PipeParameters that = (PipeParameters) obj;
-    return Objects.equals(
+  public PipeParameters addOrReplaceEquivalentAttributes(PipeParameters that) {
+    Map<String, Entry<String, String>> thisMap =
         this.attributes.entrySet().stream()
-            .collect(
-                Collectors.toMap(
-                    entry -> KeyReducer.reduce(entry.getKey()), 
Entry::getValue,
-                    (oldValue, newValue) -> oldValue, TreeMap::new))
-            .toString(),
+            .collect(Collectors.toMap(entry -> 
KeyReducer.reduce(entry.getKey()), entry -> entry));
+    Map<String, Entry<String, String>> thatMap =
         that.attributes.entrySet().stream()
-            .collect(
-                Collectors.toMap(
-                    entry -> KeyReducer.reduce(entry.getKey()), 
Entry::getValue,
-                    (oldValue, newValue) -> oldValue, TreeMap::new))
-            .toString());
+            .collect(Collectors.toMap(entry -> 
KeyReducer.reduce(entry.getKey()), entry -> entry));
+    thatMap.forEach(
+        (key, entry) -> {
+          this.attributes.remove(thisMap.getOrDefault(key, entry).getKey());
+          this.attributes.put(entry.getKey(), entry.getValue());
+        });
+    return this;
   }
 
   private static class KeyReducer {
@@ -334,6 +328,7 @@ public class PipeParameters {
   }
 
   public static class ValueHider {
+
     private static final Set<String> KEYS = new HashSet<>();
 
     private static final String PLACEHOLDER = "******";
diff --git 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index af9cbb8ee84..d64bbd4d1d6 100644
--- 
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ 
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -563,19 +563,19 @@ connectorAttributeClause
 
 alterPipe
     : ALTER PIPE pipeName=identifier
-        modifyProcessorAttributesClause?
-        modifyConnectorAttributesClause?
+        alterProcessorAttributesClause?
+        alterConnectorAttributesClause?
     ;
 
-modifyProcessorAttributesClause
-    : MODIFY PROCESSOR
+alterProcessorAttributesClause
+    : (MODIFY | REPLACE) PROCESSOR
         LR_BRACKET
         (processorAttributeClause COMMA)* processorAttributeClause?
         RR_BRACKET
     ;
 
-modifyConnectorAttributesClause
-    : MODIFY (CONNECTOR | SINK)
+alterConnectorAttributesClause
+    : (MODIFY | REPLACE) (CONNECTOR | SINK)
         LR_BRACKET
         (connectorAttributeClause COMMA)* connectorAttributeClause?
         RR_BRACKET
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
index 196852406c5..10085a49357 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/pipe/PipeTaskInfo.java
@@ -54,6 +54,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -162,16 +163,18 @@ public class PipeTaskInfo implements SnapshotProcessor {
     throw new PipeException(exceptionMessage);
   }
 
-  public void checkBeforeAlterPipe(TAlterPipeReq alterPipeRequest) throws 
PipeException {
+  public void checkAndUpdateRequestBeforeAlterPipe(TAlterPipeReq 
alterPipeRequest)
+      throws PipeException {
     acquireReadLock();
     try {
-      checkBeforeAlterPipeInternal(alterPipeRequest);
+      checkAndUpdateRequestBeforeAlterPipeInternal(alterPipeRequest);
     } finally {
       releaseReadLock();
     }
   }
 
-  private void checkBeforeAlterPipeInternal(TAlterPipeReq alterPipeRequest) 
throws PipeException {
+  private void checkAndUpdateRequestBeforeAlterPipeInternal(TAlterPipeReq 
alterPipeRequest)
+      throws PipeException {
     if (!isPipeExisted(alterPipeRequest.getPipeName())) {
       final String exceptionMessage =
           String.format(
@@ -180,30 +183,46 @@ public class PipeTaskInfo implements SnapshotProcessor {
       throw new PipeException(exceptionMessage);
     }
 
-    PipeMeta pipeMetaFromCoordinator = 
getPipeMetaByPipeName(alterPipeRequest.getPipeName());
-    PipeStaticMeta pipeStaticMetaFromCoordinator = 
pipeMetaFromCoordinator.getStaticMeta();
-    // fill empty attributes and check useless alter from the perspective of CN
-    boolean needToAlter = false;
-    if (alterPipeRequest.getProcessorAttributes().isEmpty()) {
-      alterPipeRequest.setProcessorAttributes(
-          
pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute());
-    } else if (!(new PipeParameters(alterPipeRequest.getProcessorAttributes()))
-        .isEquivalent(pipeStaticMetaFromCoordinator.getProcessorParameters())) 
{
-      needToAlter = true;
-    }
-    if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
-      alterPipeRequest.setConnectorAttributes(
-          
pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
-    } else if (!(new PipeParameters(alterPipeRequest.getConnectorAttributes())
-        
.isEquivalent(pipeStaticMetaFromCoordinator.getConnectorParameters()))) {
-      needToAlter = true;
-    }
-    if (!needToAlter) {
-      final String exceptionMessage =
-          String.format(
-              "Failed to alter pipe %s, nothing to alter", 
alterPipeRequest.getPipeName());
-      LOGGER.warn(exceptionMessage);
-      throw new PipeException(exceptionMessage);
+    PipeStaticMeta pipeStaticMetaFromCoordinator =
+        getPipeMetaByPipeName(alterPipeRequest.getPipeName()).getStaticMeta();
+    // deep copy current pipe static meta
+    PipeStaticMeta copiedPipeStaticMetaFromCoordinator =
+        new PipeStaticMeta(
+            pipeStaticMetaFromCoordinator.getPipeName(),
+            pipeStaticMetaFromCoordinator.getCreationTime(),
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getExtractorParameters().getAttribute()),
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute()),
+            new 
HashMap<>(pipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute()));
+
+    // 1. In modify mode, based on the passed attributes:
+    //   1.1. if they are empty, the original attributes are filled directly.
+    //   1.2. Otherwise, corresponding updates on original attributes are 
performed.
+    // 2. In replace mode, do nothing here.
+    if (!alterPipeRequest.isReplaceAllProcessorAttributes) { // modify mode
+      if (alterPipeRequest.getProcessorAttributes().isEmpty()) {
+        alterPipeRequest.setProcessorAttributes(
+            
copiedPipeStaticMetaFromCoordinator.getProcessorParameters().getAttribute());
+      } else {
+        alterPipeRequest.setProcessorAttributes(
+            copiedPipeStaticMetaFromCoordinator
+                .getProcessorParameters()
+                .addOrReplaceEquivalentAttributes(
+                    new 
PipeParameters(alterPipeRequest.getProcessorAttributes()))
+                .getAttribute());
+      }
+    }
+    if (!alterPipeRequest.isReplaceAllConnectorAttributes) { // modify mode
+      if (alterPipeRequest.getConnectorAttributes().isEmpty()) {
+        alterPipeRequest.setConnectorAttributes(
+            
copiedPipeStaticMetaFromCoordinator.getConnectorParameters().getAttribute());
+      } else {
+        alterPipeRequest.setConnectorAttributes(
+            copiedPipeStaticMetaFromCoordinator
+                .getConnectorParameters()
+                .addOrReplaceEquivalentAttributes(
+                    new 
PipeParameters(alterPipeRequest.getConnectorAttributes()))
+                .getAttribute());
+      }
     }
   }
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 4b378a255fd..601943d0bd8 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -80,15 +80,18 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
     LOGGER.info(
         "AlterPipeProcedureV2: executeFromValidateTask({})", 
alterPipeRequest.getPipeName());
 
+    // We should execute checkBeforeAlterPipe before checking the pipe plugin. 
This method will
+    // update the alterPipeRequest based on the alterPipeRequest and existing 
pipe metadata.
+    pipeTaskInfo.get().checkAndUpdateRequestBeforeAlterPipe(alterPipeRequest);
+
     final PipeManager pipeManager = env.getConfigManager().getPipeManager();
     pipeManager
         .getPipePluginCoordinator()
         .getPipePluginInfo()
         .checkPipePluginExistence(
-            new HashMap<>(),
+            new HashMap<>(), // no need to check pipe source plugin
             alterPipeRequest.getProcessorAttributes(),
             alterPipeRequest.getConnectorAttributes());
-    pipeTaskInfo.get().checkBeforeAlterPipe(alterPipeRequest);
 
     return false;
   }
@@ -112,7 +115,10 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
         new PipeStaticMeta(
             alterPipeRequest.getPipeName(),
             System.currentTimeMillis(),
-            new 
HashMap<>(currentPipeStaticMeta.getExtractorParameters().getAttribute()),
+            new HashMap<>(
+                currentPipeStaticMeta
+                    .getExtractorParameters()
+                    .getAttribute()), // reuse pipe source plugin
             new HashMap<>(alterPipeRequest.getProcessorAttributes()),
             new HashMap<>(alterPipeRequest.getConnectorAttributes()));
 
@@ -254,6 +260,8 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
       ReadWriteIOUtils.write(entry.getKey(), stream);
       ReadWriteIOUtils.write(entry.getValue(), stream);
     }
+    ReadWriteIOUtils.write(alterPipeRequest.isReplaceAllProcessorAttributes, 
stream);
+    ReadWriteIOUtils.write(alterPipeRequest.isReplaceAllConnectorAttributes, 
stream);
     if (currentPipeStaticMeta != null) {
       ReadWriteIOUtils.write(true, stream);
       currentPipeStaticMeta.serialize(stream);
@@ -300,6 +308,8 @@ public class AlterPipeProcedureV2 extends 
AbstractOperatePipeProcedureV2 {
           .getConnectorAttributes()
           .put(ReadWriteIOUtils.readString(byteBuffer), 
ReadWriteIOUtils.readString(byteBuffer));
     }
+    alterPipeRequest.isReplaceAllProcessorAttributes = 
ReadWriteIOUtils.readBool(byteBuffer);
+    alterPipeRequest.isReplaceAllConnectorAttributes = 
ReadWriteIOUtils.readBool(byteBuffer);
     if (ReadWriteIOUtils.readBool(byteBuffer)) {
       currentPipeStaticMeta = PipeStaticMeta.deserialize(byteBuffer);
     }
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
index 326fcc791bc..f23d22a1638 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2Test.java
@@ -48,10 +48,7 @@ public class AlterPipeProcedureV2Test {
 
     AlterPipeProcedureV2 proc =
         new AlterPipeProcedureV2(
-            new TAlterPipeReq()
-                .setPipeName("testPipe")
-                .setProcessorAttributes(processorAttributes)
-                .setConnectorAttributes(connectorAttributes));
+            new TAlterPipeReq("testPipe", processorAttributes, 
connectorAttributes, false, true));
 
     try {
       proc.serialize(outputStream);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index c17b4c5e1bd..b4c09e362cb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1661,15 +1661,16 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
   public SettableFuture<ConfigTaskResult> alterPipe(AlterPipeStatement 
alterPipeStatement) {
     SettableFuture<ConfigTaskResult> future = SettableFuture.create();
 
-    // Validate before alteration
+    // Validate before alteration - only validate replace mode
+    final String pipeName = alterPipeStatement.getPipeName();
     try {
-      if (!alterPipeStatement.getProcessorAttributes().isEmpty()) {
+      if (!alterPipeStatement.getProcessorAttributes().isEmpty()
+          && alterPipeStatement.isReplaceAllProcessorAttributes()) {
         
PipeAgent.plugin().validateProcessor(alterPipeStatement.getProcessorAttributes());
       }
-      if (!alterPipeStatement.getConnectorAttributes().isEmpty()) {
-        PipeAgent.plugin()
-            .validateConnector(
-                alterPipeStatement.getPipeName(), 
alterPipeStatement.getConnectorAttributes());
+      if (!alterPipeStatement.getConnectorAttributes().isEmpty()
+          && alterPipeStatement.isReplaceAllConnectorAttributes()) {
+        PipeAgent.plugin().validateConnector(pipeName, 
alterPipeStatement.getConnectorAttributes());
       }
     } catch (Exception e) {
       LOGGER.info("Failed to validate pipe statement, because {}", 
e.getMessage(), e);
@@ -1681,16 +1682,15 @@ public class ClusterConfigTaskExecutor implements 
IConfigTaskExecutor {
     try (ConfigNodeClient configNodeClient =
         
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
       TAlterPipeReq req =
-          new TAlterPipeReq()
-              .setPipeName(alterPipeStatement.getPipeName())
-              
.setProcessorAttributes(alterPipeStatement.getProcessorAttributes())
-              
.setConnectorAttributes(alterPipeStatement.getConnectorAttributes());
+          new TAlterPipeReq(
+              pipeName,
+              alterPipeStatement.getProcessorAttributes(),
+              alterPipeStatement.getConnectorAttributes(),
+              alterPipeStatement.isReplaceAllProcessorAttributes(),
+              alterPipeStatement.isReplaceAllConnectorAttributes());
       TSStatus tsStatus = configNodeClient.alterPipe(req);
       if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != tsStatus.getCode()) {
-        LOGGER.warn(
-            "Failed to alter pipe {} in config node, status is {}.",
-            alterPipeStatement.getPipeName(),
-            tsStatus);
+        LOGGER.warn("Failed to alter pipe {} in config node, status is {}.", 
pipeName, tsStatus);
         future.setException(new IoTDBException(tsStatus.message, 
tsStatus.code));
       } else {
         future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index fedc436cb32..fe96a9dcee2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -224,6 +224,7 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -3606,19 +3607,25 @@ public class ASTVisitor extends 
IoTDBSqlParserBaseVisitor<Statement> {
       throw new SemanticException(
           "Not support for this sql in ALTER PIPE, please enter pipe name.");
     }
-    if (ctx.modifyProcessorAttributesClause() != null) {
+    if (ctx.alterProcessorAttributesClause() != null) {
       alterPipeStatement.setProcessorAttributes(
           parseProcessorAttributesClause(
-              
ctx.modifyProcessorAttributesClause().processorAttributeClause()));
+              
ctx.alterProcessorAttributesClause().processorAttributeClause()));
+      alterPipeStatement.setReplaceAllProcessorAttributes(
+          Objects.nonNull(ctx.alterProcessorAttributesClause().REPLACE()));
     } else {
       alterPipeStatement.setProcessorAttributes(new HashMap<>());
+      alterPipeStatement.setReplaceAllProcessorAttributes(false);
     }
-    if (ctx.modifyConnectorAttributesClause() != null) {
+    if (ctx.alterConnectorAttributesClause() != null) {
       alterPipeStatement.setConnectorAttributes(
           parseConnectorAttributesClause(
-              
ctx.modifyConnectorAttributesClause().connectorAttributeClause()));
+              
ctx.alterConnectorAttributesClause().connectorAttributeClause()));
+      alterPipeStatement.setReplaceAllConnectorAttributes(
+          Objects.nonNull(ctx.alterConnectorAttributesClause().REPLACE()));
     } else {
       alterPipeStatement.setConnectorAttributes(new HashMap<>());
+      alterPipeStatement.setReplaceAllConnectorAttributes(false);
     }
     return alterPipeStatement;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
index 9fcdf2436dc..644d26ba0bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/metadata/pipe/AlterPipeStatement.java
@@ -39,6 +39,8 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
   private String pipeName;
   private Map<String, String> processorAttributes;
   private Map<String, String> connectorAttributes;
+  private boolean isReplaceAllProcessorAttributes;
+  private boolean isReplaceAllConnectorAttributes;
 
   public AlterPipeStatement(StatementType alterPipeStatement) {
     this.statementType = alterPipeStatement;
@@ -56,6 +58,14 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     return connectorAttributes;
   }
 
+  public boolean isReplaceAllProcessorAttributes() {
+    return isReplaceAllProcessorAttributes;
+  }
+
+  public boolean isReplaceAllConnectorAttributes() {
+    return isReplaceAllConnectorAttributes;
+  }
+
   public void setPipeName(String pipeName) {
     this.pipeName = pipeName;
   }
@@ -68,6 +78,14 @@ public class AlterPipeStatement extends Statement implements 
IConfigStatement {
     this.connectorAttributes = connectorAttributes;
   }
 
+  public void setReplaceAllProcessorAttributes(boolean 
replaceAllProcessorAttributes) {
+    isReplaceAllProcessorAttributes = replaceAllProcessorAttributes;
+  }
+
+  public void setReplaceAllConnectorAttributes(boolean 
replaceAllConnectorAttributes) {
+    isReplaceAllConnectorAttributes = replaceAllConnectorAttributes;
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 52b4cccdc99..a04cfe588a5 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -701,8 +701,10 @@ struct TCreatePipeReq {
 
 struct TAlterPipeReq {
     1: required string pipeName
-    2: optional map<string, string> processorAttributes
-    3: optional map<string, string> connectorAttributes
+    2: required map<string, string> processorAttributes
+    3: required map<string, string> connectorAttributes
+    4: required bool isReplaceAllProcessorAttributes
+    5: required bool isReplaceAllConnectorAttributes
 }
 
 // Deprecated, restored for compatibility


Reply via email to