This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 54c29aaabdc Pipe: Fix the receiver fails to automatically create a
table when receiving InsertRowsNode (#13935)
54c29aaabdc is described below
commit 54c29aaabdc5dd935de70f86fef078d0d854c935
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Oct 29 03:29:42 2024 +0800
Pipe: Fix the receiver fails to automatically create a table when receiving
InsertRowsNode (#13935)
---
.../db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java | 7 +++++--
.../iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java | 9 ++++++++-
.../queryengine/plan/statement/pipe/PipeEnrichedStatement.java | 8 +++++++-
3 files changed, 20 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 5a974d1e35f..5f438bbd8fa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -796,8 +796,11 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return;
}
- final CreateDBTask task =
- new CreateDBTask(new TDatabaseSchema(ROOT + PATH_SEPARATOR_CHAR +
database), true);
+ final TDatabaseSchema schema =
+ new TDatabaseSchema(new TDatabaseSchema(ROOT + PATH_SEPARATOR_CHAR +
database));
+ schema.setIsTableModel(true);
+
+ final CreateDBTask task = new CreateDBTask(schema, true);
try {
final ListenableFuture<ConfigTaskResult> future =
task.execute(ClusterConfigTaskExecutor.getInstance());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
index 8650c0fc18a..75c228fb0bc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
@@ -37,6 +37,9 @@ import java.util.List;
public class InsertRows extends WrappedInsertStatement {
+ // Only InsertRows constructed by Pipe will be set to true
+ private boolean allowCreateTable = false;
+
public InsertRows(InsertRowsStatement insertRowsStatement, MPPQueryContext
context) {
super(insertRowsStatement, context);
}
@@ -79,6 +82,10 @@ public class InsertRows extends WrappedInsertStatement {
throw new UnsupportedOperationException();
}
+ public void setAllowCreateTable(boolean allowCreateTable) {
+ this.allowCreateTable = allowCreateTable;
+ }
+
@Override
public void validateTableSchema(Metadata metadata, MPPQueryContext context) {
for (InsertRowStatement insertRowStatement :
@@ -92,7 +99,7 @@ public class InsertRows extends WrappedInsertStatement {
AnalyzeUtils.getDatabaseName(insertRowStatement, context),
incomingTableSchema,
context,
- false)
+ allowCreateTable)
.orElse(null);
}
if (realSchema == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
index 824f65a085f..e8b60b46b77 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.statement.pipe;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRows;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -77,6 +78,11 @@ public class PipeEnrichedStatement extends Statement {
@Override
public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement
toRelationalStatement(
MPPQueryContext context) {
- return new PipeEnriched(innerStatement.toRelationalStatement(context));
+ final PipeEnriched pipeEnriched =
+ new PipeEnriched(innerStatement.toRelationalStatement(context));
+ if (pipeEnriched.getInnerStatement() instanceof InsertRows) {
+ ((InsertRows)
pipeEnriched.getInnerStatement()).setAllowCreateTable(true);
+ }
+ return pipeEnriched;
}
}