This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 54c29aaabdc Pipe: Fix the receiver fails to automatically create a 
table when receiving InsertRowsNode (#13935)
54c29aaabdc is described below

commit 54c29aaabdc5dd935de70f86fef078d0d854c935
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Oct 29 03:29:42 2024 +0800

    Pipe: Fix the receiver fails to automatically create a table when receiving 
InsertRowsNode (#13935)
---
 .../db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java  | 7 +++++--
 .../iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java | 9 ++++++++-
 .../queryengine/plan/statement/pipe/PipeEnrichedStatement.java   | 8 +++++++-
 3 files changed, 20 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 5a974d1e35f..5f438bbd8fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -796,8 +796,11 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
       return;
     }
 
-    final CreateDBTask task =
-        new CreateDBTask(new TDatabaseSchema(ROOT + PATH_SEPARATOR_CHAR + 
database), true);
+    final TDatabaseSchema schema =
+        new TDatabaseSchema(new TDatabaseSchema(ROOT + PATH_SEPARATOR_CHAR + 
database));
+    schema.setIsTableModel(true);
+
+    final CreateDBTask task = new CreateDBTask(schema, true);
     try {
       final ListenableFuture<ConfigTaskResult> future =
           task.execute(ClusterConfigTaskExecutor.getInstance());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
index 8650c0fc18a..75c228fb0bc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/InsertRows.java
@@ -37,6 +37,9 @@ import java.util.List;
 
 public class InsertRows extends WrappedInsertStatement {
 
+  // Only InsertRows constructed by Pipe will be set to true
+  private boolean allowCreateTable = false;
+
   public InsertRows(InsertRowsStatement insertRowsStatement, MPPQueryContext 
context) {
     super(insertRowsStatement, context);
   }
@@ -79,6 +82,10 @@ public class InsertRows extends WrappedInsertStatement {
     throw new UnsupportedOperationException();
   }
 
+  public void setAllowCreateTable(boolean allowCreateTable) {
+    this.allowCreateTable = allowCreateTable;
+  }
+
   @Override
   public void validateTableSchema(Metadata metadata, MPPQueryContext context) {
     for (InsertRowStatement insertRowStatement :
@@ -92,7 +99,7 @@ public class InsertRows extends WrappedInsertStatement {
                     AnalyzeUtils.getDatabaseName(insertRowStatement, context),
                     incomingTableSchema,
                     context,
-                    false)
+                    allowCreateTable)
                 .orElse(null);
       }
       if (realSchema == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
index 824f65a085f..e8b60b46b77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/pipe/PipeEnrichedStatement.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.queryengine.plan.statement.pipe;
 
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRows;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.PipeEnriched;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
@@ -77,6 +78,11 @@ public class PipeEnrichedStatement extends Statement {
   @Override
   public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement 
toRelationalStatement(
       MPPQueryContext context) {
-    return new PipeEnriched(innerStatement.toRelationalStatement(context));
+    final PipeEnriched pipeEnriched =
+        new PipeEnriched(innerStatement.toRelationalStatement(context));
+    if (pipeEnriched.getInnerStatement() instanceof InsertRows) {
+      ((InsertRows) 
pipeEnriched.getInnerStatement()).setAllowCreateTable(true);
+    }
+    return pipeEnriched;
   }
 }

Reply via email to