This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c5d0c397d98 Pipe: Added userName / password logic for alter pipe
replace statement (#15219)
c5d0c397d98 is described below
commit c5d0c397d986faeb36fdd7fceddfe66c66041129
Author: Caideyipi <[email protected]>
AuthorDate: Fri Mar 28 15:46:59 2025 +0800
Pipe: Added userName / password logic for alter pipe replace statement
(#15219)
---
.../execution/config/TableConfigTaskVisitor.java | 23 ++++++++++++++-------
.../execution/config/TreeConfigTaskVisitor.java | 24 +++++++++++++++-------
2 files changed, 33 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index 8cfcd89d07f..e6f2546f389 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -976,23 +976,32 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
}
@Override
- protected IConfigTask visitAlterPipe(AlterPipe node, MPPQueryContext
context) {
+ protected IConfigTask visitAlterPipe(final AlterPipe node, final
MPPQueryContext context) {
context.setQueryType(QueryType.WRITE);
- accessControl.checkUserIsAdmin(context.getSession().getUserName());
- for (String ExtractorAttribute : node.getExtractorAttributes().keySet()) {
- if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+ final String userName = context.getSession().getUserName();
+ accessControl.checkUserIsAdmin(userName);
+
+ final String pipeName = node.getPipeName();
+ final Map<String, String> extractorAttributes =
node.getExtractorAttributes();
+ for (final String extractorAttributeKey : extractorAttributes.keySet()) {
+ if (extractorAttributeKey.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
throw new SemanticException(
String.format(
"Failed to alter pipe %s, modifying %s is not allowed.",
- node.getPipeName(), ExtractorAttribute));
+ pipeName, extractorAttributeKey));
}
}
// If the source is replaced, sql-dialect uses the current Alter Pipe
sql-dialect. If it is
// modified, the original sql-dialect is used.
if (node.isReplaceAllExtractorAttributes()) {
- node.getExtractorAttributes()
- .put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+ extractorAttributes.put(
+ SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+ checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+ }
+
+ if (node.isReplaceAllConnectorAttributes()) {
+ checkAndEnrichSinkUserName(pipeName, node.getConnectorAttributes(),
userName);
}
return new AlterPipeTask(node);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index ec849227517..62aac3aa56c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -196,6 +196,7 @@ import org.apache.tsfile.exception.NotImplementedException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import static
org.apache.iotdb.commons.executable.ExecutableManager.getUnTrustedUriErrorMsg;
import static
org.apache.iotdb.commons.executable.ExecutableManager.isUriTrusted;
@@ -545,23 +546,32 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
@Override
public IConfigTask visitAlterPipe(
- AlterPipeStatement alterPipeStatement, MPPQueryContext context) {
+ final AlterPipeStatement alterPipeStatement, final MPPQueryContext
context) {
- for (String ExtractorAttribute :
alterPipeStatement.getExtractorAttributes().keySet()) {
- if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+ for (final String extractorAttributeKey :
+ alterPipeStatement.getExtractorAttributes().keySet()) {
+ if (extractorAttributeKey.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
throw new SemanticException(
String.format(
"Failed to alter pipe %s, modifying %s is not allowed.",
- alterPipeStatement.getPipeName(), ExtractorAttribute));
+ alterPipeStatement.getPipeName(), extractorAttributeKey));
}
}
+ final String userName = context.getSession().getUserName();
+ final String pipeName = alterPipeStatement.getPipeName();
+ final Map<String, String> extractorAttributes =
alterPipeStatement.getExtractorAttributes();
+
// If the source is replaced, sql-dialect uses the current Alter Pipe
sql-dialect. If it is
// modified, the original sql-dialect is used.
if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
- alterPipeStatement
- .getExtractorAttributes()
- .put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
+ extractorAttributes.put(
+ SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
+ checkAndEnrichSourceUserName(pipeName, extractorAttributes, userName);
+ }
+
+ if (alterPipeStatement.isReplaceAllConnectorAttributes()) {
+ checkAndEnrichSinkUserName(pipeName,
alterPipeStatement.getConnectorAttributes(), userName);
}
return new AlterPipeTask(alterPipeStatement);