This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 98c823461be Fix pipe schema snapshot database creation (#17910)
98c823461be is described below
commit 98c823461be83181e454d15152d9c67e61c7b06d
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 15 15:12:12 2026 +0800
Fix pipe schema snapshot database creation (#17910)
* Fix pipe schema snapshot database creation
* Fix legacy pipe receiver database conflict handling
* Fix pipe enriched config statement execution
---
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 3 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 41 ++++++++++++++++++++--
.../iotdb/db/queryengine/plan/Coordinator.java | 15 +++++---
3 files changed, 51 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index c4c3986259f..feeabc698ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -175,8 +175,7 @@ public class IoTDBLegacyPipeReceiverAgent {
false,
false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+ && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
LOGGER.error(
DataNodePipeMessages.CREATE_DATABASE_ERROR_STATEMENT_RESULT_STATUS,
statement,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index ba7f5f7e03e..fb2eabb6c85 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFil
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -101,6 +102,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
@@ -625,8 +627,16 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadSchemaSnapShot(
final Map<String, String> parameters, final List<String>
fileAbsolutePaths)
throws IllegalPathException, IOException {
- final PartialPath databasePath =
-
PartialPath.getQualifiedDatabasePartialPath(parameters.get(ColumnHeaderConstant.DATABASE));
+ final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE);
+ final PartialPath databasePath =
PartialPath.getQualifiedDatabasePartialPath(databaseName);
+
+ if (!PathUtils.isTableModelDatabase(databaseName)) {
+ final TSStatus createDatabaseStatus =
createSchemaSnapshotDatabaseIfNecessary(databasePath);
+ if (createDatabaseStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return createDatabaseStatus;
+ }
+ }
+
final SRStatementGenerator generator =
SchemaRegionSnapshotParser.translate2Statements(
Paths.get(fileAbsolutePaths.get(0)),
@@ -697,6 +707,33 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return PipeReceiverStatusHandler.getPriorStatus(results);
}
+ private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath
databasePath) {
+ final DatabaseSchemaStatement statement =
+ new
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+ statement.setDatabasePath(databasePath);
+
+ final TSStatus status = executeStatementAndClassifyExceptions(statement);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ if (status.getCode() ==
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ return Objects.equals(
+ status.getMessage(),
+ databasePath.getFullPath() + " has already been created as
database")
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+ : new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(status.getMessage());
+ }
+
+ if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(status.getMessage());
+ }
+
+ return status;
+ }
+
private TPipeTransferResp handleTransferSchemaPlan(final
PipeTransferPlanNodeReq req) {
// We may be able to skip the alter logical view's exception parsing
because
// the "AlterLogicalViewNode" is itself idempotent
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index d520bdf6a2c..73a09597cd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -154,6 +154,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewr
import
org.apache.iotdb.db.queryengine.plan.relational.sql.rewrite.StatementRewriteFactory;
import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.thrift.TBase;
@@ -400,13 +401,19 @@ public class Coordinator {
long startTime) {
queryContext.setTimeOut(timeOut);
queryContext.setStartTime(startTime);
- if (statement instanceof IConfigStatement) {
- queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
+ final Statement configStatement =
+ statement instanceof PipeEnrichedStatement
+ && ((PipeEnrichedStatement) statement).getInnerStatement()
+ instanceof IConfigStatement
+ ? ((PipeEnrichedStatement) statement).getInnerStatement()
+ : statement;
+ if (configStatement instanceof IConfigStatement) {
+ queryContext.setQueryType(((IConfigStatement)
configStatement).getQueryType());
return new ConfigExecution(
queryContext,
- statement.getType(),
+ configStatement.getType(),
executor,
- statement.accept(new TreeConfigTaskVisitor(), queryContext));
+ configStatement.accept(new TreeConfigTaskVisitor(), queryContext));
}
TreeModelPlanner treeModelPlanner =
new TreeModelPlanner(