This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch codex/fix-pipe-schema-snapshot-database in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 67316b26f1dc1cf76828b8b89a15af806df2d9cd Author: Caideyipi <[email protected]> AuthorDate: Thu Jun 11 15:01:39 2026 +0800 Fix pipe schema snapshot database creation --- .../protocol/thrift/IoTDBDataNodeReceiver.java | 41 ++++++++++++++++++++-- 1 file changed, 39 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index e6c8ddefc99..d43840e8e23 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -44,6 +44,7 @@ import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFil import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2; import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; +import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -101,6 +102,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; +import org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; @@ -625,8 +627,16 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { private TSStatus loadSchemaSnapShot( final Map<String, String> parameters, final List<String> fileAbsolutePaths) throws IllegalPathException, IOException { - final PartialPath databasePath = - PartialPath.getQualifiedDatabasePartialPath(parameters.get(ColumnHeaderConstant.DATABASE)); + final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE); + final PartialPath databasePath = PartialPath.getQualifiedDatabasePartialPath(databaseName); + + if (!PathUtils.isTableModelDatabase(databaseName)) { + final TSStatus createDatabaseStatus = createSchemaSnapshotDatabaseIfNecessary(databasePath); + if (createDatabaseStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return createDatabaseStatus; + } + } + final SRStatementGenerator generator = SchemaRegionSnapshotParser.translate2Statements( Paths.get(fileAbsolutePaths.get(0)), @@ -697,6 +707,33 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { return PipeReceiverStatusHandler.getPriorStatus(results); } + private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath databasePath) { + final DatabaseSchemaStatement statement = + new DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE); + statement.setDatabasePath(databasePath); + + final TSStatus status = executeStatementAndClassifyExceptions(statement); + if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) { + return status; + } + + if (status.getCode() == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + return Objects.equals( + status.getMessage(), + databasePath.getFullPath() + " has already been created as database") + ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS) + : new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) { + return new TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode()) + .setMessage(status.getMessage()); + } + + return status; + } + private TPipeTransferResp handleTransferSchemaPlan(final PipeTransferPlanNodeReq req) { // We may be able to skip the alter logical view's exception parsing because // the "AlterLogicalViewNode" is itself idempotent
