This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b9e7bd3aece Load: Fixed the issue that the loading table model file
failed under Tree Dialect Cli (#15942)
b9e7bd3aece is described below
commit b9e7bd3aecee5501a79c05f63bdd241d8a5499dc
Author: Zhenyu Luo <[email protected]>
AuthorDate: Tue Jul 22 14:12:21 2025 +0800
Load: Fixed the issue that the loading table model file failed under Tree
Dialect Cli (#15942)
* Load: Fixed the issue that the loading table model file failed under Tree
Dialect Cli
* add IT
* IT spotless
---
.../manual/basic/IoTDBPipeDataSinkIT.java | 14 ++++++++++----
.../db/queryengine/common/MPPQueryContext.java | 4 ++++
.../plan/analyze/load/LoadTsFileAnalyzer.java | 22 ++++++++++++++++++++++
3 files changed, 36 insertions(+), 4 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java
index 23412f05edb..384caec5c74 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeDataSinkIT.java
@@ -130,20 +130,25 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeTableModelDualManualIT {
@Test
public void testSinkTabletFormat() throws Exception {
- testSinkFormat("tablet");
+ testSinkFormat("tablet", false);
}
@Test
public void testSinkTsFileFormat() throws Exception {
- testSinkFormat("tsfile");
+ testSinkFormat("tsfile", false);
+ }
+
+ @Test
+ public void testTsFileFormatAndAsyncLoad() throws Exception {
+ testSinkFormat("tsfile", true);
}
@Test
public void testSinkHybridFormat() throws Exception {
- testSinkFormat("hybrid");
+ testSinkFormat("hybrid", false);
}
- private void testSinkFormat(final String format) throws Exception {
+ private void testSinkFormat(final String format, final boolean isAsyncLoad)
throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
final String receiverIp = receiverDataNode.getIp();
@@ -179,6 +184,7 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeTableModelDualManualIT {
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port",
Integer.toString(receiverPort));
connectorAttributes.put("connector.format", format);
+ connectorAttributes.put("connector.load-tsfile-strategy", isAsyncLoad ?
"async" : "sync");
connectorAttributes.put("connector.realtime-first", "false");
Assert.assertEquals(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 93768931a49..b4bd87b2e7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -178,6 +178,10 @@ public class MPPQueryContext {
return session;
}
+ public void setSession(SessionInfo session) {
+ this.session = session;
+ }
+
public long getStartTime() {
return startTime;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index f9a02c56799..8ec6bd7bb48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -28,7 +28,9 @@ import
org.apache.iotdb.db.exception.load.LoadAnalyzeException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
import org.apache.iotdb.db.exception.load.LoadEmptyFileException;
import org.apache.iotdb.db.exception.sql.SemanticException;
+import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
@@ -410,6 +412,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
private void analyzeSingleTsFile(final File tsFile, int i) throws Exception {
+ final SessionInfo sessionInfo = context.getSession();
try (final TsFileSequenceReader reader = new
TsFileSequenceReader(tsFile.getAbsolutePath())) {
// check whether the tsfile is tree-model or not
final Map<String, TableSchema> tableSchemaMap =
reader.getTableSchemaMap();
@@ -448,8 +451,24 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
}
if (isTableModelFile) {
+ final SessionInfo newSessionInfo =
+ new SessionInfo(
+ sessionInfo.getSessionId(),
+ sessionInfo.getUserName(),
+ sessionInfo.getZoneId(),
+ sessionInfo.getDatabaseName().orElse(null),
+ IClientSession.SqlDialect.TABLE);
+ context.setSession(newSessionInfo);
doAnalyzeSingleTableFile(tsFile, reader, timeseriesMetadataIterator,
tableSchemaMap);
} else {
+ final SessionInfo newSessionInfo =
+ new SessionInfo(
+ sessionInfo.getSessionId(),
+ sessionInfo.getUserName(),
+ sessionInfo.getZoneId(),
+ sessionInfo.getDatabaseName().orElse(null),
+ IClientSession.SqlDialect.TREE);
+ context.setSession(newSessionInfo);
doAnalyzeSingleTreeFile(tsFile, reader, timeseriesMetadataIterator);
}
} catch (final LoadEmptyFileException loadEmptyFileException) {
@@ -457,6 +476,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
if (isDeleteAfterLoad) {
FileUtils.deleteQuietly(tsFile);
}
+ } finally {
+ // reset the session info to the original one
+ context.setSession(sessionInfo);
}
}