This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 28c4e68a6c4 Fix pipe tree database creation on receiver (#17991)
28c4e68a6c4 is described below
commit 28c4e68a6c40fb8210eab994f50c647d40ba17a3
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 22 13:53:39 2026 +0800
Fix pipe tree database creation on receiver (#17991)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 147 ++++++++++++++++++++-
...eeStatementDataTypeConvertExecutionVisitor.java | 13 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 1 +
.../protocol/thrift/IoTDBDataNodeReceiverTest.java | 55 ++++++++
.../scheduler/load/LoadTsFileSchedulerTest.java | 31 +++++
5 files changed, 239 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 4f101e1796a..978d924c056 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -26,6 +26,8 @@ import org.apache.iotdb.commons.disk.FolderManager;
import org.apache.iotdb.commons.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.commons.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.IoTDBException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
@@ -89,6 +91,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import
org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.ClusterConfigTaskExecutor;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.DatabaseSchemaTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational.CreateDBTask;
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.metadata.write.view.AlterLogicalViewNode;
@@ -99,6 +102,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsOfOneDeviceStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
@@ -133,6 +137,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -166,7 +171,8 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
this::executeStatementForTableModel);
private final PipeTreeStatementDataTypeConvertExecutionVisitor
treeStatementDataTypeConvertExecutionVisitor =
- new
PipeTreeStatementDataTypeConvertExecutionVisitor(this::executeStatementForTreeModel);
+ new PipeTreeStatementDataTypeConvertExecutionVisitor(
+ statement -> executeStatementForTreeModel(statement,
getTreeDatabaseName(statement)));
public final PipeTreeStatementToBatchVisitor batchVisitor = new
PipeTreeStatementToBatchVisitor();
// Used for data transfer: confignode (cluster A) -> datanode (cluster B) ->
confignode (cluster
@@ -186,6 +192,14 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private static final PipeConfig PIPE_CONFIG = PipeConfig.getInstance();
private PipeMemoryBlock allocatedMemoryBlock;
+ private final Set<String> autoCreatedTreeDatabases =
ConcurrentHashMap.newKeySet();
+ private final Set<String> conflictedTreeDatabases =
ConcurrentHashMap.newKeySet();
+
+ private enum TreeDatabaseCreationResult {
+ SKIPPED,
+ CREATED_OR_EXISTED,
+ CONFLICTED
+ }
static {
try {
@@ -988,6 +1002,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
((InsertBaseStatement) statement).getDatabaseName().isPresent()
? ((InsertBaseStatement) statement).getDatabaseName().get()
: null;
+ } else if (statement instanceof InsertBaseStatement) {
+ isTableModelStatement = false;
+ databaseName = getTreeDatabaseName(statement);
} else {
isTableModelStatement = false;
databaseName = null;
@@ -1038,7 +1055,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
final TSStatus status =
isTableModelStatement
? executeStatementForTableModel(statement, databaseName)
- : executeStatementForTreeModel(statement);
+ : executeStatementForTreeModel(statement,
getTreeDatabaseName(statement));
// Try to convert data type if the status code is not success. Insert
statements normally return
// above after the first converted execution. The retry path is kept for
load and fallback
@@ -1181,7 +1198,84 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
}
}
- private TSStatus executeStatementForTreeModel(final Statement statement) {
+ private TreeDatabaseCreationResult autoCreateTreeDatabaseIfNecessary(final
String database) {
+ if (database == null
+ || LoadTsFileStatement.getDatabaseLevelByTreeDatabase(database) == null
+ ||
!IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled()) {
+ return TreeDatabaseCreationResult.SKIPPED;
+ }
+ if (autoCreatedTreeDatabases.contains(database)) {
+ return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
+ }
+ if (conflictedTreeDatabases.contains(database)) {
+ return TreeDatabaseCreationResult.CONFLICTED;
+ }
+
+ try {
+ final TSStatus status =
+ AuthorityChecker.getAccessControl()
+ .checkCanCreateDatabaseForTree(getUserEntity(), new
PartialPath(database));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new PipeException(status.getMessage());
+ }
+
+ final DatabaseSchemaStatement statement =
+ new
DatabaseSchemaStatement(DatabaseSchemaStatement.DatabaseSchemaStatementType.CREATE);
+ statement.setDatabasePath(new PartialPath(database));
+ statement.setEnablePrintExceptionLog(false);
+ final DatabaseSchemaTask task = new DatabaseSchemaTask(statement);
+ final ListenableFuture<ConfigTaskResult> future =
+ task.execute(ClusterConfigTaskExecutor.getInstance());
+ final ConfigTaskResult result = future.get();
+ final int statusCode = result.getStatusCode().getStatusCode();
+ if (statusCode == TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ || statusCode ==
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ autoCreatedTreeDatabases.add(database);
+ return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
+ }
+ if (statusCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+ conflictedTreeDatabases.add(database);
+ return TreeDatabaseCreationResult.CONFLICTED;
+ }
+ throw new PipeException(
+ String.format(
+ "Auto create tree database failed: %s, status code: %s",
+ database, result.getStatusCode()));
+ } catch (final IllegalPathException e) {
+ throw new PipeException(String.format("Illegal tree database %s.",
database), e);
+ } catch (final ExecutionException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
+ final Throwable rootCause = getRootCause(e);
+ final int errorCode;
+ if (rootCause instanceof IoTDBException) {
+ errorCode = ((IoTDBException) rootCause).getErrorCode();
+ } else if (rootCause instanceof IoTDBRuntimeException) {
+ errorCode = ((IoTDBRuntimeException) rootCause).getErrorCode();
+ } else {
+ errorCode = TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode();
+ }
+ if (errorCode == TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+ autoCreatedTreeDatabases.add(database);
+ return TreeDatabaseCreationResult.CREATED_OR_EXISTED;
+ }
+ if (errorCode == TSStatusCode.DATABASE_CONFLICT.getStatusCode()) {
+ conflictedTreeDatabases.add(database);
+ return TreeDatabaseCreationResult.CONFLICTED;
+ }
+ throw new PipeException(
+ DataNodePipeMessages.AUTO_CREATE_DATABASE_FAILED_BECAUSE +
e.getMessage());
+ }
+ }
+
+ private TSStatus executeStatementForTreeModel(
+ final Statement statement, final String databaseName) {
+ if (autoCreateTreeDatabaseIfNecessary(databaseName) ==
TreeDatabaseCreationResult.CONFLICTED) {
+ // Continue execution, but let partition analysis infer the
receiver-side database.
+ clearTreeDatabaseName(statement);
+ }
+
return Coordinator.getInstance()
.executeForTreeModel(
shouldMarkAsPipeRequest.get() ? new
PipeEnrichedStatement(statement) : statement,
@@ -1196,6 +1290,53 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
.status;
}
+ private IAuditEntity getUserEntity() {
+ return userEntity != null
+ ? userEntity
+ : AuthorityChecker.createIAuditEntity(username,
SESSION_MANAGER.getCurrSession());
+ }
+
+ private String getTreeDatabaseName(final Statement statement) {
+ if (statement instanceof LoadTsFileStatement) {
+ return ((LoadTsFileStatement) statement).getDatabase();
+ }
+ if (statement instanceof InsertBaseStatement) {
+ return ((InsertBaseStatement) statement).getDatabaseName().orElse(null);
+ }
+ return null;
+ }
+
+ static void clearTreeDatabaseName(final Statement statement) {
+ if (statement instanceof LoadTsFileStatement) {
+ final LoadTsFileStatement loadTsFileStatement = (LoadTsFileStatement)
statement;
+ loadTsFileStatement.setDatabase(null);
+ loadTsFileStatement.setDatabaseLevel(
+ IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel());
+ } else if (statement instanceof InsertBaseStatement) {
+ clearTreeInsertDatabaseName((InsertBaseStatement) statement);
+ }
+ }
+
+ private static void clearTreeInsertDatabaseName(final InsertBaseStatement
statement) {
+ statement.setDatabaseName(null);
+ if (statement instanceof InsertRowsStatement) {
+ for (final InsertBaseStatement childStatement :
+ ((InsertRowsStatement) statement).getInsertRowStatementList()) {
+ childStatement.setDatabaseName(null);
+ }
+ } else if (statement instanceof InsertRowsOfOneDeviceStatement) {
+ for (final InsertBaseStatement childStatement :
+ ((InsertRowsOfOneDeviceStatement)
statement).getInsertRowStatementList()) {
+ childStatement.setDatabaseName(null);
+ }
+ } else if (statement instanceof InsertMultiTabletsStatement) {
+ for (final InsertBaseStatement childStatement :
+ ((InsertMultiTabletsStatement)
statement).getInsertTabletStatementList()) {
+ childStatement.setDatabaseName(null);
+ }
+ }
+ }
+
private TSStatus executeStatementForTableModelWithPermissionCheck(
final
org.apache.iotdb.commons.queryengine.plan.relational.sql.ast.Statement
statement,
final String databaseName) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
index e78e273fc0c..f8c6eadc5f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java
@@ -105,12 +105,15 @@ public class
PipeTreeStatementDataTypeConvertExecutionVisitor
new TsFileInsertionEventScanParser(
file, new IoTDBTreePattern(null), Long.MIN_VALUE,
Long.MAX_VALUE, null, null, true)) {
for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ final InsertTabletStatement insertTabletStatement =
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(
+ tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight())
+ .constructStatement();
+ if (loadTsFileStatement.getDatabase() != null) {
+
insertTabletStatement.setDatabaseName(loadTsFileStatement.getDatabase());
+ }
final PipeConvertedInsertTabletStatement statement =
- new PipeConvertedInsertTabletStatement(
- PipeTransferTabletRawReq.toTPipeTransferRawReq(
- tabletWithIsAligned.getLeft(),
tabletWithIsAligned.getRight())
- .constructStatement(),
- false);
+ new PipeConvertedInsertTabletStatement(insertTabletStatement,
false);
TSStatus result;
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 6ca3164a3e1..e2db158475a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -652,6 +652,7 @@ public class LoadTsFileScheduler implements IScheduler {
.setConvertOnTypeMismatch(true);
if (database != null) {
statement.setDatabase(database);
+ statement.updateDatabaseLevelByTreeDatabase();
}
if (isGeneratedByPipe) {
statement.markIsGeneratedByPipe();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
index 7f9197dfa04..8f2e86c62d0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiverTest.java
@@ -20,6 +20,10 @@
package org.apache.iotdb.db.pipe.receiver.protocol.thrift;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator;
@@ -29,6 +33,8 @@ import org.junit.Test;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
public class IoTDBDataNodeReceiverTest {
@@ -150,4 +156,53 @@ public class IoTDBDataNodeReceiverTest {
Files.deleteIfExists(tsFile);
}
}
+
+ @Test
+ public void testClearTreeDatabaseNameForLoadTsFileStatement() throws
Exception {
+ final Path tsFile = Files.createTempFile("pipe-load-clear-tree-database",
".tsfile");
+ try {
+ final LoadTsFileStatement statement =
+ IoTDBDataNodeReceiver.buildLoadTsFileStatementForSync(
+ "root.test.sg_0", tsFile.toString(), true, true);
+
+ IoTDBDataNodeReceiver.clearTreeDatabaseName(statement);
+
+ Assert.assertNull(statement.getDatabase());
+ Assert.assertEquals(
+ IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(),
+ statement.getDatabaseLevel());
+ } finally {
+ Files.deleteIfExists(tsFile);
+ }
+ }
+
+ @Test
+ public void testClearTreeDatabaseNameForBatchInsertStatements() {
+ final InsertRowStatement rowStatement1 = new InsertRowStatement();
+ rowStatement1.setDatabaseName("root.test.sg_0");
+ final InsertRowStatement rowStatement2 = new InsertRowStatement();
+ rowStatement2.setDatabaseName("root.test.sg_0");
+ final InsertRowsStatement insertRowsStatement = new InsertRowsStatement();
+ insertRowsStatement.setDatabaseName("root.test.sg_0");
+ insertRowsStatement.setInsertRowStatementList(Arrays.asList(rowStatement1,
rowStatement2));
+
+ IoTDBDataNodeReceiver.clearTreeDatabaseName(insertRowsStatement);
+
+ Assert.assertFalse(insertRowsStatement.getDatabaseName().isPresent());
+ Assert.assertFalse(rowStatement1.getDatabaseName().isPresent());
+ Assert.assertFalse(rowStatement2.getDatabaseName().isPresent());
+
+ final InsertTabletStatement tabletStatement = new InsertTabletStatement();
+ tabletStatement.setDatabaseName("root.test.sg_0");
+ final InsertMultiTabletsStatement insertMultiTabletsStatement =
+ new InsertMultiTabletsStatement();
+ insertMultiTabletsStatement.setDatabaseName("root.test.sg_0");
+ insertMultiTabletsStatement.setInsertTabletStatementList(
+ Collections.singletonList(tabletStatement));
+
+ IoTDBDataNodeReceiver.clearTreeDatabaseName(insertMultiTabletsStatement);
+
+
Assert.assertFalse(insertMultiTabletsStatement.getDatabaseName().isPresent());
+ Assert.assertFalse(tabletStatement.getDatabaseName().isPresent());
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 2db41c2ccb0..19d97490c81 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -28,6 +28,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.junit.Assert;
import org.junit.Before;
@@ -35,6 +36,9 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import java.io.File;
+import java.lang.reflect.Method;
+
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
@@ -87,4 +91,31 @@ public class LoadTsFileSchedulerTest {
Assert.assertEquals("test",
LoadTsFileScheduler.getPartitionQueryDatabase(node, false));
}
+
+ @Test
+ public void testBuildRetryTreeLoadStatementUpdatesDatabaseLevel() throws
Exception {
+ final LoadTsFileScheduler scheduler =
+ new LoadTsFileScheduler(
+ distributedQueryPlan,
+ mock(MPPQueryContext.class),
+ mock(QueryStateMachine.class),
+ mock(IClientManager.class),
+ mock(IPartitionFetcher.class),
+ true);
+ final Method method =
+ LoadTsFileScheduler.class.getDeclaredMethod(
+ "buildRetryTreeLoadStatement", String.class, boolean.class,
String.class);
+ method.setAccessible(true);
+
+ final File tsFile = File.createTempFile("test", ".tsfile");
+ tsFile.deleteOnExit();
+
+ final LoadTsFileStatement statement =
+ (LoadTsFileStatement)
+ method.invoke(scheduler, tsFile.getAbsolutePath(), true,
"root.test.sg_0");
+
+ Assert.assertEquals("root.test.sg_0", statement.getDatabase());
+ Assert.assertEquals(2, statement.getDatabaseLevel());
+ Assert.assertTrue(statement.isGeneratedByPipe());
+ }
}