This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch codex/fix-pipe-schema-snapshot-database
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 67316b26f1dc1cf76828b8b89a15af806df2d9cd
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 11 15:01:39 2026 +0800

    Fix pipe schema snapshot database creation
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     | 41 ++++++++++++++++++++--
 1 file changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index e6c8ddefc99..d43840e8e23 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFil
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -101,6 +102,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
@@ -625,8 +627,16 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
   private TSStatus loadSchemaSnapShot(
       final Map<String, String> parameters, final List<String> 
fileAbsolutePaths)
       throws IllegalPathException, IOException {
-    final PartialPath databasePath =
-        
PartialPath.getQualifiedDatabasePartialPath(parameters.get(ColumnHeaderConstant.DATABASE));
+    final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE);
+    final PartialPath databasePath = 
PartialPath.getQualifiedDatabasePartialPath(databaseName);
+
+    if (!PathUtils.isTableModelDatabase(databaseName)) {
+      final TSStatus createDatabaseStatus = 
createSchemaSnapshotDatabaseIfNecessary(databasePath);
+      if (createDatabaseStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        return createDatabaseStatus;
+      }
+    }
+
     final SRStatementGenerator generator =
         SchemaRegionSnapshotParser.translate2Statements(
             Paths.get(fileAbsolutePaths.get(0)),
@@ -697,6 +707,33 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
     return PipeReceiverStatusHandler.getPriorStatus(results);
   }
 
+  private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath 
databasePath) {
+    final DatabaseSchemaStatement statement =
+        new 
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+    statement.setDatabasePath(databasePath);
+
+    final TSStatus status = executeStatementAndClassifyExceptions(statement);
+    if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      return status;
+    }
+
+    if (status.getCode() == 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+      return Objects.equals(
+              status.getMessage(),
+              databasePath.getFullPath() + " has already been created as 
database")
+          ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+          : new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+              .setMessage(status.getMessage());
+    }
+
+    if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+      return new 
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+          .setMessage(status.getMessage());
+    }
+
+    return status;
+  }
+
   private TPipeTransferResp handleTransferSchemaPlan(final 
PipeTransferPlanNodeReq req) {
     // We may be able to skip the alter logical view's exception parsing 
because
     // the "AlterLogicalViewNode" is itself idempotent

Reply via email to