This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 940b9d7327a05192ae80dfe16aecd22d9427ef61 Author: Zhenyu Luo <[email protected]> AuthorDate: Tue Jul 22 14:12:21 2025 +0800 Load: Fixed the issue that the loading table model file failed under Tree Dialect Cli (#15942) * Load: Fixed the issue that the loading table model file failed under Tree Dialect Cli * add IT * IT spotless (cherry picked from commit b9e7bd3aecee5501a79c05f63bdd241d8a5499dc) --- .../manual/basic/IoTDBPipeDataSinkIT.java | 14 ++++++++++---- .../db/queryengine/common/MPPQueryContext.java | 4 ++++ .../plan/analyze/load/LoadTsFileAnalyzer.java | 22 ++++++++++++++++++++++ 3 files changed, 36 insertions(+), 4 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java index 23412f05edb..384caec5c74 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java @@ -130,20 +130,25 @@ public class IoTDBPipeDataSinkIT extends AbstractPipeTableModelDualManualIT { @Test public void testSinkTabletFormat() throws Exception { - testSinkFormat("tablet"); + testSinkFormat("tablet", false); } @Test public void testSinkTsFileFormat() throws Exception { - testSinkFormat("tsfile"); + testSinkFormat("tsfile", false); + } + + @Test + public void testTsFileFormatAndAsyncLoad() throws Exception { + testSinkFormat("tsfile", true); } @Test public void testSinkHybridFormat() throws Exception { - testSinkFormat("hybrid"); + testSinkFormat("hybrid", false); } - private void testSinkFormat(final String format) throws Exception { + private void testSinkFormat(final String format, final boolean isAsyncLoad) throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -179,6 +184,7 @@ public class IoTDBPipeDataSinkIT extends AbstractPipeTableModelDualManualIT { connectorAttributes.put("connector.ip", receiverIp); connectorAttributes.put("connector.port", Integer.toString(receiverPort)); connectorAttributes.put("connector.format", format); + connectorAttributes.put("connector.load-tsfile-strategy", isAsyncLoad ? "async" : "sync"); connectorAttributes.put("connector.realtime-first", "false"); Assert.assertEquals( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 93768931a49..b4bd87b2e7e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -178,6 +178,10 @@ public class MPPQueryContext { return session; } + public void setSession(SessionInfo session) { + this.session = session; + } + public long getStartTime() { return startTime; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index f9a02c56799..8ec6bd7bb48 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -28,7 +28,9 @@ import org.apache.iotdb.db.exception.load.LoadAnalyzeException; import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException; import org.apache.iotdb.db.exception.load.LoadEmptyFileException; import org.apache.iotdb.db.exception.sql.SemanticException; +import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; +import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis; @@ -410,6 +412,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable { } private void analyzeSingleTsFile(final File tsFile, int i) throws Exception { + final SessionInfo sessionInfo = context.getSession(); try (final TsFileSequenceReader reader = new TsFileSequenceReader(tsFile.getAbsolutePath())) { // check whether the tsfile is tree-model or not final Map<String, TableSchema> tableSchemaMap = reader.getTableSchemaMap(); @@ -448,8 +451,24 @@ public class LoadTsFileAnalyzer implements AutoCloseable { } if (isTableModelFile) { + final SessionInfo newSessionInfo = + new SessionInfo( + sessionInfo.getSessionId(), + sessionInfo.getUserName(), + sessionInfo.getZoneId(), + sessionInfo.getDatabaseName().orElse(null), + IClientSession.SqlDialect.TABLE); + context.setSession(newSessionInfo); doAnalyzeSingleTableFile(tsFile, reader, timeseriesMetadataIterator, tableSchemaMap); } else { + final SessionInfo newSessionInfo = + new SessionInfo( + sessionInfo.getSessionId(), + sessionInfo.getUserName(), + sessionInfo.getZoneId(), + sessionInfo.getDatabaseName().orElse(null), + IClientSession.SqlDialect.TREE); + context.setSession(newSessionInfo); doAnalyzeSingleTreeFile(tsFile, reader, timeseriesMetadataIterator); } } catch (final LoadEmptyFileException loadEmptyFileException) { @@ -457,6 +476,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable { if (isDeleteAfterLoad) { FileUtils.deleteQuietly(tsFile); } + } finally { + // reset the session info to the original one + context.setSession(sessionInfo); } }
