This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new c5d0c397d98 Pipe: Added userName / password logic for alter pipe 
replace statement (#15219)
c5d0c397d98 is described below

commit c5d0c397d986faeb36fdd7fceddfe66c66041129
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 28 15:46:59 2025 +0800

    Pipe: Added userName / password logic for alter pipe replace statement 
(#15219)
---
 .../execution/config/TableConfigTaskVisitor.java   | 23 ++++++++++++++-------
 .../execution/config/TreeConfigTaskVisitor.java    | 24 +++++++++++++++-------
 2 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 8cfcd89d07f..e6f2546f389 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -976,23 +976,32 @@ public class TableConfigTaskVisitor extends 
AstVisitor<IConfigTask, MPPQueryCont
   }
 
   @Override
-  protected IConfigTask visitAlterPipe(AlterPipe node, MPPQueryContext 
context) {
+  protected IConfigTask visitAlterPipe(final AlterPipe node, final 
MPPQueryContext context) {
     context.setQueryType(QueryType.WRITE);
-    accessControl.checkUserIsAdmin(context.getSession().getUserName());
 
-    for (String ExtractorAttribute : node.getExtractorAttributes().keySet()) {
-      if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+    final String userName = context.getSession().getUserName();
+    accessControl.checkUserIsAdmin(userName);
+
+    final String pipeName = node.getPipeName();
+    final Map<String, String> extractorAttributes = 
node.getExtractorAttributes();
+    for (final String extractorAttributeKey : extractorAttributes.keySet()) {
+      if (extractorAttributeKey.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
         throw new SemanticException(
             String.format(
                 "Failed to alter pipe %s, modifying %s is not allowed.",
-                node.getPipeName(), ExtractorAttribute));
+                pipeName, extractorAttributeKey));
       }
     }
     // If the source is replaced, sql-dialect uses the current Alter Pipe 
sql-dialect. If it is
     // modified, the original sql-dialect is used.
     if (node.isReplaceAllExtractorAttributes()) {
-      node.getExtractorAttributes()
-          .put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+      extractorAttributes.put(
+          SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+      checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+    }
+
+    if (node.isReplaceAllConnectorAttributes()) {
+      checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(), 
userName);
     }
 
     return new AlterPipeTask(node);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index ec849227517..62aac3aa56c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -196,6 +196,7 @@ import org.apache.tsfile.exception.NotImplementedException;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import static 
org.apache.iotdb.commons.executable.ExecutableManager.getUnTrustedUriErrorMsg;
 import static 
org.apache.iotdb.commons.executable.ExecutableManager.isUriTrusted;
@@ -545,23 +546,32 @@ public class TreeConfigTaskVisitor extends 
StatementVisitor<IConfigTask, MPPQuer
 
   @Override
   public IConfigTask visitAlterPipe(
-      AlterPipeStatement alterPipeStatement, MPPQueryContext context) {
+      final AlterPipeStatement alterPipeStatement, final MPPQueryContext 
context) {
 
-    for (String ExtractorAttribute : 
alterPipeStatement.getExtractorAttributes().keySet()) {
-      if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+    for (final String extractorAttributeKey :
+        alterPipeStatement.getExtractorAttributes().keySet()) {
+      if (extractorAttributeKey.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
         throw new SemanticException(
             String.format(
                 "Failed to alter pipe %s, modifying %s is not allowed.",
-                alterPipeStatement.getPipeName(), ExtractorAttribute));
+                alterPipeStatement.getPipeName(), extractorAttributeKey));
       }
     }
 
+    final String userName = context.getSession().getUserName();
+    final String pipeName = alterPipeStatement.getPipeName();
+    final Map<String, String> extractorAttributes = 
alterPipeStatement.getExtractorAttributes();
+
     // If the source is replaced, sql-dialect uses the current Alter Pipe 
sql-dialect. If it is
     // modified, the original sql-dialect is used.
     if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
-      alterPipeStatement
-          .getExtractorAttributes()
-          .put(SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
+      extractorAttributes.put(
+          SystemConstant.SQL_DIALECT_KEY, 
SystemConstant.SQL_DIALECT_TREE_VALUE);
+      checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+    }
+
+    if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
+      checkAndEnrichSinkUserName(pipeName, 
alterPipeStatement.getConnectorAttributes(), userName);
     }
 
     return new AlterPipeTask(alterPipeStatement);

Reply via email to