This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5f956e18ae7 Pipe Load: Binded database for pipe-generated tree model
load (#17661)
5f956e18ae7 is described below
commit 5f956e18ae705f506f3730df6e8fc8f1fdb17d8c
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 22:02:27 2026 +0800
Pipe Load: Binded database for pipe-generated tree model load (#17661)
* fix
* xb
* Update LoadTsFileScheduler.java
---
.../IoTDBPipeReceiverAutoCreateDisabledIT.java | 141 +++++++++++++++++++++
.../plan/scheduler/load/LoadTsFileScheduler.java | 41 ++++--
.../scheduler/load/LoadTsFileSchedulerTest.java | 20 +++
3 files changed, 192 insertions(+), 10 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
new file mode 100644
index 00000000000..6d4f2b80b33
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeReceiverAutoCreateDisabledIT.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.dual.treemodel.auto.basic;
+
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.MultiEnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoBasic;
+import
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.HashSet;
+import java.util.Set;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2DualTreeAutoBasic.class})
+public class IoTDBPipeReceiverAutoCreateDisabledIT extends
AbstractPipeDualTreeModelAutoIT {
+
+ @Override
+ @Before
+ public void setUp() {
+ MultiEnvFactory.createEnv(2);
+ senderEnv = MultiEnvFactory.getEnv(0);
+ receiverEnv = MultiEnvFactory.getEnv(1);
+ setupConfig();
+ senderEnv.initClusterEnvironment(1, 1);
+ receiverEnv.initClusterEnvironment(1, 1);
+ }
+
+ @Override
+ protected void setupConfig() {
+ super.setupConfig();
+
receiverEnv.getConfig().getCommonConfig().setAutoCreateSchemaEnabled(false);
+ }
+
+ @Test
+ public void testReceiverAutoCreateSchemaDisabledWithSpecialTimeSeries()
throws Exception {
+ Assert.assertEquals(1, senderEnv.getConfigNodeWrapperList().size());
+ Assert.assertEquals(1, senderEnv.getDataNodeWrapperList().size());
+ Assert.assertEquals(1, receiverEnv.getConfigNodeWrapperList().size());
+ Assert.assertEquals(1, receiverEnv.getDataNodeWrapperList().size());
+
+ final String createPipeSql =
+ String.format(
+ "create pipe test with source
('inclusion'='all','source.realtime.mode'='stream','source.realtime.enable'='true')
"
+ + "with sink ('sink'='iotdb-thrift-sink',
'sink.node-urls'='%s');",
+ receiverEnv.getDataNodeWrapper(0).getIpAndPortString());
+ final String createDatabaseSql = "create database root.test.sg;";
+ final String createFirstTimeSeriesSql =
+ "create timeseries
root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`.`~!@#$%^&*()_+=:'\"/|[]{}` float;";
+ final String insertFirstSql =
+ "insert into root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`(time,
`~!@#$%^&*()_+=:'\"/|[]{}`) "
+ + "values (1706659200,3.5),(1706660000, 15.5);";
+ final String firstSelectSql =
+ "select `~!@#$%^&*()_+=:'\"/|[]{}` from
root.test.sg.`1~!@#$%^&*()_+=:'\"/|[]{}`;";
+ final String createSecondTimeSeriesSql =
+ "create timeseries
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`
int32;";
+ final String insertSecondSql =
+ "insert into
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`"
+ + "(time, `~!@#$%^&*().,<>?_-+=:'\"/|[]{}`) values
(1706666400,23456),(1706667400,23456),(1706686400,23456);";
+ final String secondSelectSql =
+ "select * from
root.test.ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz1.`~!@#$%^&*().,<>?_-+=:'\"/|[]{}`;";
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(createPipeSql);
+ statement.execute(createDatabaseSql);
+ statement.execute(createFirstTimeSeriesSql);
+ statement.execute(insertFirstSql);
+ final QueryResult firstQueryResult = queryForResult(statement,
firstSelectSql);
+ statement.execute(createSecondTimeSeriesSql);
+ statement.execute(insertSecondSql);
+ final QueryResult secondQueryResult = queryForResult(statement,
secondSelectSql);
+
+ awaitUntilFlush(senderEnv);
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, firstSelectSql, firstQueryResult.header,
firstQueryResult.rows);
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv, secondSelectSql, secondQueryResult.header,
secondQueryResult.rows);
+ }
+ }
+
+ private QueryResult queryForResult(final Statement statement, final String
sql)
+ throws SQLException {
+ try (final ResultSet resultSet = statement.executeQuery(sql)) {
+ final ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ final StringBuilder headerBuilder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ headerBuilder.append(resultSetMetaData.getColumnName(i)).append(",");
+ }
+
+ final Set<String> rows = new HashSet<>();
+ while (resultSet.next()) {
+ final StringBuilder rowBuilder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ rowBuilder.append(resultSet.getString(i)).append(",");
+ }
+ rows.add(rowBuilder.toString());
+ }
+ return new QueryResult(headerBuilder.toString(), rows);
+ }
+ }
+
+ private static class QueryResult {
+ private final String header;
+ private final Set<String> rows;
+
+ private QueryResult(final String header, final Set<String> rows) {
+ this.header = header;
+ this.rows = rows;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 6671dad3548..6251a67689a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -86,6 +86,7 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -179,11 +180,7 @@ public class LoadTsFileScheduler implements IScheduler {
final LoadSingleTsFileNode node = tsFileNodeList.get(i);
final String filePath = node.getTsFileResource().getTsFilePath();
- if (node.isTableModel()) {
- partitionFetcher.setDatabase(node.getDatabase());
- } else {
- partitionFetcher.setDatabase(null);
- }
+ partitionFetcher.setDatabase(getPartitionQueryDatabase(node,
isGeneratedByPipe));
boolean isLoadSingleTsFileSuccess = true;
boolean shouldRemoveFileFromLoadingSet = false;
@@ -593,9 +590,10 @@ public class LoadTsFileScheduler implements IScheduler {
.orElse(null)
: loadTsFileDataTypeConverter
.convertForTreeModel(
- LoadTsFileStatement.createUnchecked(filePath)
- .setDeleteAfterLoad(failedNode.isDeleteAfterLoad())
- .setConvertOnTypeMismatch(true))
+ buildRetryTreeLoadStatement(
+ filePath,
+ failedNode.isDeleteAfterLoad(),
+ getPartitionQueryDatabase(failedNode,
isGeneratedByPipe)))
.orElse(null);
if (loadTsFileDataTypeConverter.isSuccessful(status)) {
@@ -634,6 +632,27 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
+ static String getPartitionQueryDatabase(
+ final LoadSingleTsFileNode node, final boolean isGeneratedByPipe) {
+ return node.isTableModel() || isGeneratedByPipe ? node.getDatabase() :
null;
+ }
+
+ private LoadTsFileStatement buildRetryTreeLoadStatement(
+ final String filePath, final boolean deleteAfterLoad, final String
database)
+ throws FileNotFoundException {
+ final LoadTsFileStatement statement =
+ LoadTsFileStatement.createUnchecked(filePath)
+ .setDeleteAfterLoad(deleteAfterLoad)
+ .setConvertOnTypeMismatch(true);
+ if (database != null) {
+ statement.setDatabase(database);
+ }
+ if (isGeneratedByPipe) {
+ statement.markIsGeneratedByPipe();
+ }
+ return statement;
+ }
+
@Override
public void stop(Throwable t) {
// Do nothing
@@ -851,7 +870,8 @@ public class LoadTsFileScheduler implements IScheduler {
subSlotList.stream()
.map(
pair ->
- // (database != null) means this file will be loaded
into table-model
+ // database is an explicit database hint for
table-model loads and
+ // pipe-generated tree-model loads.
database != null
? dataPartition.getDataRegionReplicaSetForWriting(
pair.left, pair.right, database)
@@ -874,7 +894,8 @@ public class LoadTsFileScheduler implements IScheduler {
entry -> {
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(entry.getKey(), new
ArrayList<>(entry.getValue()));
- // (database != null) means this file will be loaded into
table-model
+ // database is an explicit database hint for table-model loads
and
+ // pipe-generated tree-model loads.
if (database != null) {
queryParam.setDatabaseName(database);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
index 161a95b4ec9..2db41c2ccb0 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileSchedulerTest.java
@@ -27,6 +27,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
import org.junit.Assert;
import org.junit.Before;
@@ -67,4 +68,23 @@ public class LoadTsFileSchedulerTest {
Assert.assertNull(t.getTotalCpuTime());
Assert.assertNull(t.getFragmentInfo());
}
+
+ @Test
+ public void testGetPartitionQueryDatabaseForPipeGeneratedTreeModelLoad() {
+ final LoadSingleTsFileNode node = mock(LoadSingleTsFileNode.class);
+ when(node.isTableModel()).thenReturn(false);
+ when(node.getDatabase()).thenReturn("root.test.sg");
+
+ Assert.assertEquals("root.test.sg",
LoadTsFileScheduler.getPartitionQueryDatabase(node, true));
+ Assert.assertNull(LoadTsFileScheduler.getPartitionQueryDatabase(node,
false));
+ }
+
+ @Test
+ public void testGetPartitionQueryDatabaseForTableModelLoad() {
+ final LoadSingleTsFileNode node = mock(LoadSingleTsFileNode.class);
+ when(node.isTableModel()).thenReturn(true);
+ when(node.getDatabase()).thenReturn("test");
+
+ Assert.assertEquals("test",
LoadTsFileScheduler.getPartitionQueryDatabase(node, false));
+ }
}