This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 98c823461be Fix pipe schema snapshot database creation (#17910)
98c823461be is described below

commit 98c823461be83181e454d15152d9c67e61c7b06d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 15 15:12:12 2026 +0800

    Fix pipe schema snapshot database creation (#17910)
    
    * Fix pipe schema snapshot database creation
    
    * Fix legacy pipe receiver database conflict handling
    
    * Fix pipe enriched config statement execution
---
 .../legacy/IoTDBLegacyPipeReceiverAgent.java       |  3 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 41 ++++++++++++++++++++--
 .../iotdb/db/queryengine/plan/Coordinator.java     | 15 +++++---
 3 files changed, 51 insertions(+), 8 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index c4c3986259f..feeabc698ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -175,8 +175,7 @@ public class IoTDBLegacyPipeReceiverAgent {
                   false,
                   false);
       if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
-          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
-          && result.status.code != 
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+          && result.status.code != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
         LOGGER.error(
             DataNodePipeMessages.CREATE_DATABASE_ERROR_STATEMENT_RESULT_STATUS,
             statement,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index ba7f5f7e03e..fb2eabb6c85 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -45,6 +45,7 @@ import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFil
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -101,6 +102,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
@@ -625,8 +627,16 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus loadSchemaSnapShot(
       final Map<String, String> parameters, final List<String> 
fileAbsolutePaths)
       throws IllegalPathException, IOException {
-    final PartialPath databasePath =
-        
PartialPath.getQualifiedDatabasePartialPath(parameters.get(ColumnHeaderConstant.DATABASE));
+    final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE);
+    final PartialPath databasePath = 
PartialPath.getQualifiedDatabasePartialPath(databaseName);
+
+    if (!PathUtils.isTableModelDatabase(databaseName)) {
+      final TSStatus createDatabaseStatus = 
createSchemaSnapshotDatabaseIfNecessary(databasePath);
+      if (createDatabaseStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return createDatabaseStatus;
+      }
+    }
+
     final SRStatementGenerator generator =
         SchemaRegionSnapshotParser.translate2Statements(
             Paths.get(fileAbsolutePaths.get(0)),
@@ -697,6 +707,33 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return PipeReceiverStatusHandler.getPriorStatus(results);
   }
 
+  private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath 
databasePath) {
+    final DatabaseSchemaStatement statement =
+        new 
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+    statement.setDatabasePath(databasePath);
+
+    final TSStatus status = executeStatementAndClassifyExceptions(statement);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return status;
+    }
+
+    if (status.getCode() == 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+      return Objects.equals(
+              status.getMessage(),
+              databasePath.getFullPath() + " has already been created as 
database")
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+              .setMessage(status.getMessage());
+    }
+
+    if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
+    }
+
+    return status;
+  }
+
   private TPipeTransferResp handleTransferSchemaPlan(final 
PipeTransferPlanNodeReq req) {
     // We may be able to skip the alter logical view's exception parsing 
because
     // the "AlterLogicalViewNode" is itself idempotent
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index d520bdf6a2c..73a09597cd5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -154,6 +154,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewr
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewriteFactory;
 import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.utils.SetThreadName;
 
 import org.apache.thrift.TBase;
@@ -400,13 +401,19 @@ public class Coordinator {
       long startTime) {
     queryContext.setTimeOut(timeOut);
     queryContext.setStartTime(startTime);
-    if (statement instanceof IConfigStatement) {
-      queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
+    final Statement configStatement =
+        statement instanceof PipeEnrichedStatement
+                && ((PipeEnrichedStatement) statement).getInnerStatement()
+                    instanceof IConfigStatement
+            ? ((PipeEnrichedStatement) statement).getInnerStatement()
+            : statement;
+    if (configStatement instanceof IConfigStatement) {
+      queryContext.setQueryType(((IConfigStatement) 
configStatement).getQueryType());
       return new ConfigExecution(
           queryContext,
-          statement.getType(),
+          configStatement.getType(),
           executor,
-          statement.accept(new TreeConfigTaskVisitor(), queryContext));
+          configStatement.accept(new TreeConfigTaskVisitor(), queryContext));
     }
     TreeModelPlanner treeModelPlanner =
         new TreeModelPlanner(

Reply via email to