This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new c7ee924b88a [To dev/1.3] Load: Fixed the issue of TSFile parent
directory being null and TSFile resource being updated during the Load process.
(#16751) (#16790)
c7ee924b88a is described below
commit c7ee924b88a058e30e5eea118328a244f3ca3fd5
Author: Caideyipi <[email protected]>
AuthorDate: Mon Nov 24 18:27:56 2025 +0800
[To dev/1.3] Load: Fixed the issue of TSFile parent directory being null
and TSFile resource being updated during the Load process. (#16751) (#16790)
---
.../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 43 ++++++++++++++++++++++
.../scheduler/load/LoadTsFileDispatcherImpl.java | 12 ++++--
.../plan/statement/crud/LoadTsFileStatement.java | 2 +-
.../db/storageengine/dataregion/DataRegion.java | 6 ++-
.../dataregion/tsfile/TsFileResource.java | 37 ++++++++++++++++++-
.../db/storageengine/load/disk/MinIOSelector.java | 10 +++--
6 files changed, 99 insertions(+), 11 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
index 97e05f8768c..03743d1bf4a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.it.utils.TsFileGenerator;
import org.apache.iotdb.itbase.category.ClusterIT;
@@ -66,6 +67,7 @@ import static
org.apache.iotdb.db.it.utils.TestUtils.createUser;
import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQuery;
import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSeriesPrivilege;
import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSystemPrivileges;
+import static org.apache.iotdb.it.env.cluster.ClusterConstant.USER_DIR;
@RunWith(IoTDBTestRunner.class)
@Category({LocalStandaloneIT.class, ClusterIT.class})
@@ -929,6 +931,47 @@ public class IoTDBLoadTsFileIT {
}
}
+ @Test
+ public void testLoadWithRelativePathName() throws Exception {
+ DataNodeWrapper dataNodeWrapper =
EnvFactory.getEnv().getDataNodeWrapper(0);
+
+ registerSchema();
+
+ final long writtenPoint1;
+ // device 0, device 1, sg 0
+ File relativePathFile = new File(System.getProperty(USER_DIR),
"1-0-0-0.tsfile");
+ try {
+ try (final TsFileGenerator generator = new
TsFileGenerator(relativePathFile)) {
+ generator.registerTimeseries(
+ SchemaConfig.DEVICE_0,
Collections.singletonList(SchemaConfig.MEASUREMENT_00));
+ generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL /
10_000, false);
+ writtenPoint1 = generator.getTotalNumber();
+ }
+
+ try (final Connection connection =
+
EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper);
+ final Statement statement = connection.createStatement()) {
+
+ statement.execute(String.format("load \"%s\" sglevel=2",
"1-0-0-0.tsfile"));
+
+ try (final ResultSet resultSet =
+ statement.executeQuery("select count(*) from root.sg.** group by
level=1,2")) {
+ if (resultSet.next()) {
+ final long sg1Count =
resultSet.getLong("count(root.sg.test_0.*.*)");
+ Assert.assertEquals(writtenPoint1, sg1Count);
+ } else {
+ Assert.fail("This ResultSet is empty.");
+ }
+ }
+ }
+
+ } finally {
+ if (relativePathFile.exists()) {
+ relativePathFile.delete();
+ }
+ }
+ }
+
@Test
public void testLoadWithMods() throws Exception {
final long writtenPoint1;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 77226615e27..c0703113cdf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -170,15 +170,22 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
}
} else if (planNode instanceof LoadSingleTsFileNode) { // do not need to
split
final TsFileResource tsFileResource = ((LoadSingleTsFileNode)
planNode).getTsFileResource();
+ final String filePath = tsFileResource.getTsFile().getAbsolutePath();
try {
PipeDataNodeAgent.runtime().assignProgressIndexForTsFileLoad(tsFileResource);
tsFileResource.setGeneratedByPipe(isGeneratedByPipe);
tsFileResource.serialize();
+ TsFileResource cloneTsFileResource = null;
+ try {
+ cloneTsFileResource = tsFileResource.shallowCloneForNative();
+ } catch (CloneNotSupportedException e) {
+ cloneTsFileResource = tsFileResource.shallowClone();
+ }
StorageEngine.getInstance()
.getDataRegion((DataRegionId) groupId)
.loadNewTsFile(
- tsFileResource,
+ cloneTsFileResource,
((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(),
isGeneratedByPipe,
false);
@@ -189,8 +196,7 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
resultStatus.setMessage(e.getMessage());
throw new FragmentInstanceDispatchException(resultStatus);
} catch (IOException e) {
- LOGGER.warn(
- "Serialize TsFileResource {} error.",
tsFileResource.getTsFile().getAbsolutePath(), e);
+ LOGGER.warn("Serialize TsFileResource {} error.", filePath, e);
TSStatus resultStatus = new TSStatus();
resultStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
resultStatus.setMessage(e.getMessage());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
index 1ee2bd2cc60..0b837968cde 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java
@@ -56,7 +56,7 @@ public class LoadTsFileStatement extends Statement {
private List<Long> writePointCountList;
public LoadTsFileStatement(String filePath) throws FileNotFoundException {
- this.file = new File(filePath);
+ this.file = new File(filePath).getAbsoluteFile();
this.databaseLevel =
IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel();
this.verifySchema = true;
this.deleteAfterLoad = false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index a16bb396b5c..8e166f02301 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3071,10 +3071,12 @@ public class DataRegion implements IDataRegionForQuery {
final boolean isGeneratedByPipe,
final boolean isFromConsensus)
throws LoadFileException {
- final File tsfileToBeInserted = newTsFileResource.getTsFile();
+ final File tsfileToBeInserted =
newTsFileResource.getTsFile().getAbsoluteFile();
final long newFilePartitionId =
newTsFileResource.getTimePartitionWithCheck();
- if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)) {
+ if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)
+ || !tsfileToBeInserted.exists()
+ || tsfileToBeInserted.getParentFile() == null) {
throw new LoadFileException(
"tsfile validate failed, " +
newTsFileResource.getTsFile().getName());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index be9012780d9..8571fd2e915 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -78,7 +78,7 @@ import static
org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
import static org.apache.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@SuppressWarnings("java:S1135") // ignore todos
-public class TsFileResource {
+public class TsFileResource implements Cloneable {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TsFileResource.class)
@@ -1254,4 +1254,39 @@ public class TsFileResource {
public void setLastValues(Map<IDeviceID, List<Pair<String, TimeValuePair>>>
lastValues) {
this.lastValues = lastValues;
}
+
+ public TsFileResource shallowClone() {
+ TsFileResource cloned = new TsFileResource();
+ cloned.file = this.file;
+ cloned.timeIndex = this.timeIndex;
+ cloned.maxPlanIndex = this.maxPlanIndex;
+ cloned.minPlanIndex = this.minPlanIndex;
+ cloned.compactionModFile = this.compactionModFile;
+ cloned.isSeq = this.isSeq;
+ cloned.tsFileRepairStatus = this.tsFileRepairStatus;
+ cloned.settleTsFileCallBack = this.settleTsFileCallBack;
+ cloned.deviceTimeIndexRamSize = this.deviceTimeIndexRamSize;
+ cloned.tsFileSize = this.tsFileSize;
+ cloned.processor = this.processor;
+ cloned.originTsFileResource = this.originTsFileResource;
+ cloned.isGeneratedByPipeConsensus = this.isGeneratedByPipeConsensus;
+ cloned.isGeneratedByPipe = this.isGeneratedByPipe;
+ cloned.insertionCompactionCandidateStatus =
this.insertionCompactionCandidateStatus;
+ cloned.tierLevel = this.tierLevel;
+ cloned.pathToChunkMetadataListMap = this.pathToChunkMetadataListMap;
+ cloned.pathToReadOnlyMemChunkMap = this.pathToReadOnlyMemChunkMap;
+ cloned.pathToTimeSeriesMetadataMap = this.pathToTimeSeriesMetadataMap;
+ cloned.lastValues = this.lastValues;
+ cloned.maxProgressIndex.set(this.maxProgressIndex.get());
+ cloned.atomicStatus.set(this.atomicStatus.get());
+ cloned.isEmpty.set(this.isEmpty.get());
+ cloned.tsFileID = this.tsFileID;
+ cloned.prev = null;
+ cloned.next = null;
+ return cloned;
+ }
+
+ public TsFileResource shallowCloneForNative() throws
CloneNotSupportedException {
+ return (TsFileResource) clone();
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
index 9956148b083..7ec8300058a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
@@ -81,10 +81,12 @@ public class MinIOSelector extends
InheritSystemMultiDisksStrategySelector {
throws DiskSpaceInsufficientException, LoadFileException {
String fileDirRoot = null;
try {
- fileDirRoot =
-
Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath()))
- .map(Object::toString)
- .orElse(null);
+ if (sourceDirectory != null) {
+ fileDirRoot =
+
Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath()))
+ .map(Object::toString)
+ .orElse(null);
+ }
} catch (Exception e) {
logger.warn(
"Exception occurs when reading target file's mount point {}",