This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6e9aa8ee50d Pipe: Fix a tree model pipe becomes table model pipe
exceptionally due to alter pipe operations (#14242)
6e9aa8ee50d is described below
commit 6e9aa8ee50d6a7b1b58d945fa72d2e42fd2d5349
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Dec 3 10:57:14 2024 +0800
Pipe: Fix a tree model pipe becomes table model pipe exceptionally due to
alter pipe operations (#14242)
---
.../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 5 +++-
.../iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java | 2 ++
.../execution/config/TableConfigTaskVisitor.java | 25 +++++++++++++++++++
.../execution/config/TreeConfigTaskVisitor.java | 28 ++++++++++++++++++++++
.../pipe/config/constant/SystemConstant.java | 2 ++
5 files changed, 61 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index 364c5d475f0..66bae32c5b1 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
+import org.apache.iotdb.itbase.env.BaseEnv;
import org.junit.Assert;
import org.junit.Test;
@@ -105,7 +106,7 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
}
// Alter pipe (modify)
- try (final Connection connection = senderEnv.getConnection();
+ try (final Connection connection =
senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
final Statement statement = connection.createStatement()) {
statement.execute("alter pipe a2b modify source
('source.pattern'='root.test2')");
} catch (SQLException e) {
@@ -120,6 +121,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
// Check status
Assert.assertEquals("STOPPED", showPipeResult.get(0).state);
// Check configurations
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("__system.sql-dialect=tree"));
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("source.pattern=root.test2"));
Assert.assertTrue(
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
index 0f682c7f666..0f9a71a6ce9 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/tablemodel/IoTDBPipeAlterIT.java
@@ -293,6 +293,8 @@ public class IoTDBPipeAlterIT extends
AbstractPipeTableModelTestIT {
// check status
Assert.assertEquals("RUNNING", showPipeResult.get(0).state);
// check configurations
+
+
Assert.assertTrue(showPipeResult.get(0).pipeExtractor.contains("__system.sql-dialect=table"));
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("source=iotdb-source"));
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("database-name=test"));
Assert.assertFalse(showPipeResult.get(0).pipeExtractor.contains("table-name=test"));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
index ab93ad531cd..4abaf1bbff5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TableConfigTaskVisitor.java
@@ -613,6 +613,15 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
protected IConfigTask visitCreatePipe(CreatePipe node, MPPQueryContext
context) {
context.setQueryType(QueryType.WRITE);
+ for (String ExtractorAttribute : node.getExtractorAttributes().keySet()) {
+ if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+ throw new SemanticException(
+ String.format(
+ "Failed to create pipe %s, setting %s is not allowed.",
+ node.getPipeName(), ExtractorAttribute));
+ }
+ }
+
// Inject table model into the extractor attributes
node.getExtractorAttributes()
.put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
@@ -623,6 +632,22 @@ public class TableConfigTaskVisitor extends
AstVisitor<IConfigTask, MPPQueryCont
@Override
protected IConfigTask visitAlterPipe(AlterPipe node, MPPQueryContext
context) {
context.setQueryType(QueryType.WRITE);
+
+ for (String ExtractorAttribute : node.getExtractorAttributes().keySet()) {
+ if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+ throw new SemanticException(
+ String.format(
+ "Failed to alter pipe %s, modifying %s is not allowed.",
+ node.getPipeName(), ExtractorAttribute));
+ }
+ }
+ // If the source is replaced, sql-dialect uses the current Alter Pipe
sql-dialect. If it is
+ // modified, the original sql-dialect is used.
+ if (node.isReplaceAllExtractorAttributes()) {
+ node.getExtractorAttributes()
+ .put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TABLE_VALUE);
+ }
+
return new AlterPipeTask(node);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 5e9c9603e31..f1fed8f7b50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.execution.config;
import org.apache.iotdb.common.rpc.thrift.Model;
import org.apache.iotdb.commons.pipe.config.constant.SystemConstant;
+import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountDatabaseTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.CountTimeSlotListTask;
@@ -463,6 +464,15 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
@Override
public IConfigTask visitCreatePipe(
CreatePipeStatement createPipeStatement, MPPQueryContext context) {
+ for (String ExtractorAttribute :
createPipeStatement.getExtractorAttributes().keySet()) {
+ if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+ throw new SemanticException(
+ String.format(
+ "Failed to create pipe %s, setting %s is not allowed.",
+ createPipeStatement.getPipeName(), ExtractorAttribute));
+ }
+ }
+
// Inject tree model into the extractor attributes
createPipeStatement
.getExtractorAttributes()
@@ -474,6 +484,24 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
@Override
public IConfigTask visitAlterPipe(
AlterPipeStatement alterPipeStatement, MPPQueryContext context) {
+
+ for (String ExtractorAttribute :
alterPipeStatement.getExtractorAttributes().keySet()) {
+ if (ExtractorAttribute.startsWith(SystemConstant.SYSTEM_PREFIX_KEY)) {
+ throw new SemanticException(
+ String.format(
+ "Failed to alter pipe %s, modifying %s is not allowed.",
+ alterPipeStatement.getPipeName(), ExtractorAttribute));
+ }
+ }
+
+ // If the source is replaced, sql-dialect uses the current Alter Pipe
sql-dialect. If it is
+ // modified, the original sql-dialect is used.
+ if (alterPipeStatement.isReplaceAllExtractorAttributes()) {
+ alterPipeStatement
+ .getExtractorAttributes()
+ .put(SystemConstant.SQL_DIALECT_KEY,
SystemConstant.SQL_DIALECT_TREE_VALUE);
+ }
+
return new AlterPipeTask(alterPipeStatement);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
index 852bc7cc1be..105390eab9c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/constant/SystemConstant.java
@@ -28,6 +28,8 @@ import java.util.Set;
public class SystemConstant {
+ public static final String SYSTEM_PREFIX_KEY = "__system";
+
public static final String RESTART_KEY = "__system.restart";
public static final boolean RESTART_DEFAULT_VALUE = false;