This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ddb9c7cf0a9 [To dev/1.3] Fix pipe schema snapshot database creation
(#17944)
ddb9c7cf0a9 is described below
commit ddb9c7cf0a9d8f51e9f2a6cdb98dc3ea31626b6b
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 18 11:01:34 2026 +0800
[To dev/1.3] Fix pipe schema snapshot database creation (#17944)
* Fix pipe schema snapshot database creation (#17910)
* Fix pipe schema snapshot database creation
* Fix legacy pipe receiver database conflict handling
* Fix pipe enriched config statement execution
(cherry picked from commit 98c823461be83181e454d15152d9c67e61c7b06d)
* Skip unrelated pipe schema snapshot databases
---
.../pipe/sink/PipeConfigNodeThriftRequestTest.java | 18 ++++++++
...eConfigPhysicalPlanPatternParseVisitorTest.java | 42 +++++++++++++++++
.../legacy/IoTDBLegacyPipeReceiverAgent.java | 3 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 54 ++++++++++++++++++++--
.../iotdb/db/queryengine/plan/Coordinator.java | 15 ++++--
.../protocol/thrift/IoTDBDataNodeReceiverTest.java | 9 ++++
6 files changed, 131 insertions(+), 10 deletions(-)
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
index f014a8ae22a..f1d17d8a0f0 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/sink/PipeConfigNodeThriftRequestTest.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigP
import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotPieceReq;
import
org.apache.iotdb.confignode.manager.pipe.sink.payload.PipeTransferConfigSnapshotSealReq;
import org.apache.iotdb.confignode.persistence.schema.CNSnapshotFileType;
+import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.junit.Assert;
import org.junit.Test;
@@ -97,4 +98,21 @@ public class PipeConfigNodeThriftRequestTest {
Assert.assertEquals(req.getFileLengths(), deserializeReq.getFileLengths());
Assert.assertEquals(req.getParameters(), deserializeReq.getParameters());
}
+
+ @Test
+ public void testPipeTransferConfigSnapshotSealReqPreservesPathPattern()
throws IOException {
+ String snapshotName = "cluster_schema.bin";
+ String templateInfoName = "template_info.bin";
+ CNSnapshotFileType fileType = CNSnapshotFileType.SCHEMA;
+ String typeString = "200";
+
+ PipeTransferConfigSnapshotSealReq req =
+ PipeTransferConfigSnapshotSealReq.toTPipeTransferReq(
+ "root.ln.**", snapshotName, 100, templateInfoName, 10, fileType,
typeString);
+ PipeTransferConfigSnapshotSealReq deserializeReq =
+ PipeTransferConfigSnapshotSealReq.fromTPipeTransferReq(req);
+
+ Assert.assertEquals(
+ "root.ln.**",
deserializeReq.getParameters().get(ColumnHeaderConstant.PATH_PATTERN));
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
index 8125a980acd..a246ea9de90 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/pipe/source/PipeConfigPhysicalPlanPatternParseVisitorTest.java
@@ -91,6 +91,28 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest {
.orElseThrow(AssertionError::new));
}
+ @Test
+ public void testCreateDatabaseWithProcess() {
+ final DatabaseSchemaPlan includedCreateDatabasePlan =
+ new DatabaseSchemaPlan(
+ ConfigPhysicalPlanType.CreateDatabase, new
TDatabaseSchema("root.ln"));
+ final DatabaseSchemaPlan excludedCreateDatabasePlan =
+ new DatabaseSchemaPlan(
+ ConfigPhysicalPlanType.CreateDatabase, new
TDatabaseSchema("root.db"));
+ final IoTDBPipePatternOperations rootLnPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**"));
+
+ Assert.assertEquals(
+ includedCreateDatabasePlan,
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .process(includedCreateDatabasePlan, rootLnPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .process(excludedCreateDatabasePlan, rootLnPattern)
+ .isPresent());
+ }
+
@Test
public void testAlterDatabase() {
final DatabaseSchemaPlan alterDatabasePlan =
@@ -201,6 +223,26 @@ public class PipeConfigPhysicalPlanPatternParseVisitorTest
{
.orElseThrow(AssertionError::new));
}
+ @Test
+ public void testCommitSetSchemaTemplateWithRootLnPattern() {
+ final IoTDBPipePatternOperations rootLnPattern =
+ new UnionIoTDBPipePattern(new IoTDBPipePattern("root.ln.**"));
+ final CommitSetSchemaTemplatePlan includedSetTemplatePlan =
+ new CommitSetSchemaTemplatePlan("t1", "root.ln.wf01");
+ final CommitSetSchemaTemplatePlan excludedSetTemplatePlan =
+ new CommitSetSchemaTemplatePlan("t1", "root.db.wf01");
+
+ Assert.assertEquals(
+ includedSetTemplatePlan,
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .process(includedSetTemplatePlan, rootLnPattern)
+ .orElseThrow(AssertionError::new));
+ Assert.assertFalse(
+ IoTDBConfigRegionSource.PATTERN_PARSE_VISITOR
+ .process(excludedSetTemplatePlan, rootLnPattern)
+ .isPresent());
+ }
+
@Test
public void testPipeUnsetSchemaTemplate() {
final PipeUnsetSchemaTemplatePlan pipeUnsetSchemaTemplatePlanOnPrefix =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
index cc85e2f4f10..b9e26ee0fa5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/legacy/IoTDBLegacyPipeReceiverAgent.java
@@ -160,8 +160,7 @@ public class IoTDBLegacyPipeReceiverAgent {
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold(),
false);
if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()
- && result.status.code !=
TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+ && result.status.code !=
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
LOGGER.error(
"Create Database error, statement: {}, result status : {}.",
statement, result.status);
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 8a5c4ad060f..4c1ed974811 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -83,6 +83,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsSta
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.DatabaseSchemaStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
@@ -536,17 +537,29 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadSchemaSnapShot(
final Map<String, String> parameters, final List<String>
fileAbsolutePaths)
throws IllegalPathException, IOException {
+ final String databaseName = parameters.get(ColumnHeaderConstant.DATABASE);
+ final PartialPath databasePath = new PartialPath(databaseName);
+
+ final String pathPattern =
parameters.get(ColumnHeaderConstant.PATH_PATTERN);
+ if (!shouldLoadSchemaSnapshotDatabase(pathPattern, databaseName)) {
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+ final PipePattern pipePattern =
+ PipePattern.parsePatternFromString(pathPattern, IoTDBPipePattern::new);
+
+ final TSStatus createDatabaseStatus =
createSchemaSnapshotDatabaseIfNecessary(databasePath);
+ if (createDatabaseStatus.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return createDatabaseStatus;
+ }
+
final SRStatementGenerator generator =
SchemaRegionSnapshotParser.translate2Statements(
Paths.get(fileAbsolutePaths.get(0)),
fileAbsolutePaths.size() > 1 ? Paths.get(fileAbsolutePaths.get(1))
: null,
- new PartialPath(parameters.get(ColumnHeaderConstant.DATABASE)));
+ databasePath);
final Set<StatementType> executionTypes =
PipeSchemaRegionSnapshotEvent.getStatementTypeSet(
parameters.get(ColumnHeaderConstant.TYPE));
- final PipePattern pipePattern =
- PipePattern.parsePatternFromString(
- parameters.get(ColumnHeaderConstant.PATH_PATTERN),
IoTDBPipePattern::new);
// Clear to avoid previous exceptions
batchVisitor.clear();
@@ -571,6 +584,39 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
return PipeReceiverStatusHandler.getPriorStatus(results);
}
+ static boolean shouldLoadSchemaSnapshotDatabase(
+ final String pathPattern, final String databaseName) {
+ return PipePattern.parsePatternFromString(pathPattern,
IoTDBPipePattern::new)
+ .mayOverlapWithDb(databaseName);
+ }
+
+ private TSStatus createSchemaSnapshotDatabaseIfNecessary(final PartialPath
databasePath) {
+ final DatabaseSchemaStatement statement =
+ new
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+ statement.setDatabasePath(databasePath);
+
+ final TSStatus status = executeStatementAndClassifyExceptions(statement);
+ if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ if (status.getCode() ==
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ return Objects.equals(
+ status.getMessage(),
+ databasePath.getFullPath() + " has already been created as
database")
+ ? RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS)
+ : new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(status.getMessage());
+ }
+
+ if (status.getCode() == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+ return new
TSStatus(TSStatusCode.PIPE_RECEIVER_USER_CONFLICT_EXCEPTION.getStatusCode())
+ .setMessage(status.getMessage());
+ }
+
+ return status;
+ }
+
private TPipeTransferResp handleTransferSchemaPlan(final
PipeTransferPlanNodeReq req) {
// We may be able to skip the alter logical view's exception parsing
because
// the "AlterLogicalViewNode" is itself idempotent
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 2f173861f22..2a548557d8c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -48,6 +48,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.TreeModelPlanner;
import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.utils.SetThreadName;
import org.slf4j.Logger;
@@ -208,13 +209,19 @@ public class Coordinator {
long startTime) {
queryContext.setTimeOut(timeOut);
queryContext.setStartTime(startTime);
- if (statement instanceof IConfigStatement) {
- queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
+ final Statement configStatement =
+ statement instanceof PipeEnrichedStatement
+ && ((PipeEnrichedStatement) statement).getInnerStatement()
+ instanceof IConfigStatement
+ ? ((PipeEnrichedStatement) statement).getInnerStatement()
+ : statement;
+ if (configStatement instanceof IConfigStatement) {
+ queryContext.setQueryType(((IConfigStatement)
configStatement).getQueryType());
return new ConfigExecution(
queryContext,
- statement.getType(),
+ configStatement.getType(),
executor,
- statement.accept(new ConfigTaskVisitor(), queryContext));
+ configStatement.accept(new ConfigTaskVisitor(), queryContext));
}
TreeModelPlanner treeModelPlanner =
new TreeModelPlanner(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index 1e279a18feb..9eac46c8351 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -110,6 +110,15 @@ public class IoTDBDataNodeReceiverTest {
}
}
+ @Test
+ public void testSchemaSnapshotDatabaseIsFilteredByPattern() {
+ Assert.assertTrue(
+ IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**",
"root.ln"));
+
Assert.assertTrue(IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.**",
"root.db"));
+ Assert.assertFalse(
+ IoTDBDataNodeReceiver.shouldLoadSchemaSnapshotDatabase("root.ln.**",
"root.db"));
+ }
+
@Test
public void testLoadTsFileSyncStatementVerifiesSchemaWhenConvertingType()
throws Exception {
final Path tsFile =
Files.createTempFile("pipe-load-convert-verify-schema", ".tsfile");