This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 769f10374dd690d76832028bd779bf6f0b8a91e4 Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Nov 20 18:57:17 2025 +0800 Load: Fixed the issue of TSFile parent directory being null and TSFile resource being updated during the Load process. (#16751) * Load: Fixed the issue of TSFile parent directory being null and TSFile resource being updated during the Load process. * add IT * fix * FIX * update * update (cherry picked from commit 0f2483fbe2ee16c4f34371417ad49b2dff71ce8d) --- .../org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java | 43 ++++++++++++++++++++++ .../scheduler/load/LoadTsFileDispatcherImpl.java | 12 ++++-- .../plan/statement/crud/LoadTsFileStatement.java | 2 +- .../db/storageengine/dataregion/DataRegion.java | 6 ++- .../dataregion/tsfile/TsFileResource.java | 43 +++++++++++++++++++++- .../db/storageengine/load/disk/MinIOSelector.java | 10 +++-- 6 files changed, 105 insertions(+), 11 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java index 36a94174751..c54adb34711 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.it.utils.TsFileGenerator; import org.apache.iotdb.itbase.category.ClusterIT; @@ -67,6 +68,7 @@ import static org.apache.iotdb.db.it.utils.TestUtils.createUser; import static org.apache.iotdb.db.it.utils.TestUtils.executeNonQuery; import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSeriesPrivilege; import static org.apache.iotdb.db.it.utils.TestUtils.grantUserSystemPrivileges; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.USER_DIR; @RunWith(IoTDBTestRunner.class) @Category({LocalStandaloneIT.class, ClusterIT.class}) @@ -742,6 +744,47 @@ public class IoTDBLoadTsFileIT { } } + @Test + public void testLoadWithRelativePathName() throws Exception { + DataNodeWrapper dataNodeWrapper = EnvFactory.getEnv().getDataNodeWrapper(0); + + registerSchema(); + + final long writtenPoint1; + // device 0, device 1, sg 0 + File relativePathFile = new File(System.getProperty(USER_DIR), "1-0-0-0.tsfile"); + try { + try (final TsFileGenerator generator = new TsFileGenerator(relativePathFile)) { + generator.registerTimeseries( + SchemaConfig.DEVICE_0, Collections.singletonList(SchemaConfig.MEASUREMENT_00)); + generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL / 10_000, false); + writtenPoint1 = generator.getTotalNumber(); + } + + try (final Connection connection = + EnvFactory.getEnv().getConnectionWithSpecifiedDataNode(dataNodeWrapper); + final Statement statement = connection.createStatement()) { + + statement.execute(String.format("load \"%s\" sglevel=2", "1-0-0-0.tsfile")); + + try (final ResultSet resultSet = + statement.executeQuery("select count(*) from root.sg.** group by level=1,2")) { + if (resultSet.next()) { + final long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)"); + Assert.assertEquals(writtenPoint1, sg1Count); + } else { + Assert.fail("This ResultSet is empty."); + } + } + } + + } finally { + if (relativePathFile.exists()) { + relativePathFile.delete(); + } + } + } + @Test public void testLoadWithMods() throws Exception { final long writtenPoint1; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java index cfe6bb1a1a6..7301ce0f406 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java @@ -170,15 +170,22 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { } } else if (planNode instanceof LoadSingleTsFileNode) { // do not need to split final TsFileResource tsFileResource = ((LoadSingleTsFileNode) planNode).getTsFileResource(); + final String filePath = tsFileResource.getTsFile().getAbsolutePath(); try { PipeDataNodeAgent.runtime().assignProgressIndexForTsFileLoad(tsFileResource); tsFileResource.setGeneratedByPipe(isGeneratedByPipe); tsFileResource.serialize(); + TsFileResource cloneTsFileResource = null; + try { + cloneTsFileResource = tsFileResource.shallowCloneForNative(); + } catch (CloneNotSupportedException e) { + cloneTsFileResource = tsFileResource.shallowClone(); + } StorageEngine.getInstance() .getDataRegion((DataRegionId) groupId) .loadNewTsFile( - tsFileResource, + cloneTsFileResource, ((LoadSingleTsFileNode) planNode).isDeleteAfterLoad(), isGeneratedByPipe, false); @@ -189,8 +196,7 @@ public class LoadTsFileDispatcherImpl implements IFragInstanceDispatcher { resultStatus.setMessage(e.getMessage()); throw new FragmentInstanceDispatchException(resultStatus); } catch (IOException e) { - LOGGER.warn( - "Serialize TsFileResource {} error.", tsFileResource.getTsFile().getAbsolutePath(), e); + LOGGER.warn("Serialize TsFileResource {} error.", filePath, e); TSStatus resultStatus = new TSStatus(); resultStatus.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode()); resultStatus.setMessage(e.getMessage()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java index 2d74925971c..d1dff1bb9cf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java @@ -70,7 +70,7 @@ public class LoadTsFileStatement extends Statement { private List<Long> writePointCountList; public LoadTsFileStatement(String filePath) throws FileNotFoundException { - this.file = new File(filePath); + this.file = new File(filePath).getAbsoluteFile(); this.databaseLevel = IoTDBDescriptor.getInstance().getConfig().getDefaultDatabaseLevel(); this.verifySchema = true; this.deleteAfterLoad = false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 2c0c39032c4..43c395abc12 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3549,10 +3549,12 @@ public class DataRegion implements IDataRegionForQuery { final boolean isGeneratedByPipe, final boolean isFromConsensus) throws LoadFileException { - final File tsfileToBeInserted = newTsFileResource.getTsFile(); + final File tsfileToBeInserted = newTsFileResource.getTsFile().getAbsoluteFile(); final long newFilePartitionId = newTsFileResource.getTimePartitionWithCheck(); - if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource)) { + if (!TsFileValidator.getInstance().validateTsFile(newTsFileResource) + || !tsfileToBeInserted.exists() + || tsfileToBeInserted.getParentFile() == null) { throw new LoadFileException( "tsfile validate failed, " + newTsFileResource.getTsFile().getName()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index 35ea1660fde..3169cf85ccb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -96,7 +96,7 @@ import static org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR; import static org.apache.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX; @SuppressWarnings("java:S1135") // ignore todos -public class TsFileResource implements PersistentResource { +public class TsFileResource implements PersistentResource, Cloneable { private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TsFileResource.class) @@ -1585,4 +1585,45 @@ public class TsFileResource implements PersistentResource { public void setLastValues(Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues) { this.lastValues = lastValues; } + + public TsFileResource shallowClone() { + TsFileResource cloned = new TsFileResource(); + cloned.file = this.file; + cloned.timeIndex = this.timeIndex; + cloned.maxPlanIndex = this.maxPlanIndex; + cloned.minPlanIndex = this.minPlanIndex; + cloned.exclusiveModFileFuture = this.exclusiveModFileFuture; + cloned.sharedModFilePathFuture = this.sharedModFilePathFuture; + cloned.modFileManagement = this.modFileManagement; + cloned.exclusiveModFile = this.exclusiveModFile; + cloned.sharedModFile = this.sharedModFile; + cloned.sharedModFileOffset = this.sharedModFileOffset; + cloned.compactionModFile = this.compactionModFile; + cloned.isSeq = this.isSeq; + cloned.tsFileRepairStatus = this.tsFileRepairStatus; + cloned.settleTsFileCallBack = this.settleTsFileCallBack; + cloned.deviceTimeIndexRamSize = this.deviceTimeIndexRamSize; + cloned.tsFileSize = this.tsFileSize; + cloned.processor = this.processor; + cloned.originTsFileResource = this.originTsFileResource; + cloned.isGeneratedByPipeConsensus = this.isGeneratedByPipeConsensus; + cloned.isGeneratedByPipe = this.isGeneratedByPipe; + cloned.insertionCompactionCandidateStatus = this.insertionCompactionCandidateStatus; + cloned.tierLevel = this.tierLevel; + cloned.pathToChunkMetadataListMap = this.pathToChunkMetadataListMap; + cloned.pathToReadOnlyMemChunkMap = this.pathToReadOnlyMemChunkMap; + cloned.pathToTimeSeriesMetadataMap = this.pathToTimeSeriesMetadataMap; + cloned.lastValues = this.lastValues; + cloned.maxProgressIndex.set(this.maxProgressIndex.get()); + cloned.atomicStatus.set(this.atomicStatus.get()); + cloned.isEmpty.set(this.isEmpty.get()); + cloned.tsFileID = this.tsFileID; + cloned.prev = null; + cloned.next = null; + return cloned; + } + + public TsFileResource shallowCloneForNative() throws CloneNotSupportedException { + return (TsFileResource) clone(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java index 9956148b083..7ec8300058a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java @@ -81,10 +81,12 @@ public class MinIOSelector extends InheritSystemMultiDisksStrategySelector { throws DiskSpaceInsufficientException, LoadFileException { String fileDirRoot = null; try { - fileDirRoot = - Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath())) - .map(Object::toString) - .orElse(null); + if (sourceDirectory != null) { + fileDirRoot = + Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath())) + .map(Object::toString) + .orElse(null); + } } catch (Exception e) { logger.warn( "Exception occurs when reading target file's mount point {}",
