This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch force_ci/object_type in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fe49c5ee0806f57a30cfff5c86d61fb653a51203 Author: Zhenyu Luo <[email protected]> AuthorDate: Thu Nov 20 18:10:19 2025 +0800 Load: Active Load supports ModV2 (#16769) (cherry picked from commit f708b96623861172ed3a4cf5a6d29433565549ef) --- .../protocol/thrift/IoTDBDataNodeReceiver.java | 4 +- .../plan/analyze/load/LoadTsFileAnalyzer.java | 5 +-- .../plan/node/load/LoadSingleTsFileNode.java | 6 +-- .../load/active/ActiveLoadDirScanner.java | 20 ++------- .../load/active/ActiveLoadTsFileLoader.java | 6 ++- ...leStatementDataTypeConvertExecutionVisitor.java | 10 ++--- ...eeStatementDataTypeConvertExecutionVisitor.java | 10 ++--- .../ActiveLoadUtil.java => util/LoadUtil.java} | 49 +++++++++++++++++++--- 8 files changed, 65 insertions(+), 45 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 652530d11b4..4d4bf7b3926 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -101,7 +101,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; -import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; +import org.apache.iotdb.db.storageengine.load.util.LoadUtil; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.db.tools.schema.SRStatementGenerator; @@ -580,7 +580,7 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver { null, shouldMarkAsPipeRequest.get()); - if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) { + if (!LoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) { throw new PipeException("Load active listening pipe dir is not set."); } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 076b529ab91..8202de5c499 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -43,9 +43,9 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; -import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; +import org.apache.iotdb.db.storageengine.load.util.LoadUtil; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; @@ -291,8 +291,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable { tabletConversionThresholdBytes, isGeneratedByPipe); - if (ActiveLoadUtil.loadTsFileAsyncToActiveDir( - tsFiles, activeLoadAttributes, isDeleteAfterLoad)) { + if (LoadUtil.loadTsFileAsyncToActiveDir(tsFiles, activeLoadAttributes, isDeleteAfterLoad)) { analysis.setFinishQueryAfterAnalyze(true); setRealStatement(analysis); return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java index c8691868c82..604fda6e1e8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java @@ -30,8 +30,8 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode; import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; -import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.util.LoadUtil; import org.apache.tsfile.exception.NotImplementedException; import org.apache.tsfile.file.metadata.IDeviceID; @@ -229,10 +229,10 @@ public class LoadSingleTsFileNode extends WritePlanNode { if (deleteAfterLoad) { Files.deleteIfExists(tsFile.toPath()); Files.deleteIfExists( - new File(tsFile.getAbsolutePath() + TsFileResource.RESOURCE_SUFFIX).toPath()); + new File(LoadUtil.getTsFileResourcePath(tsFile.getAbsolutePath())).toPath()); Files.deleteIfExists(ModificationFile.getExclusiveMods(tsFile).toPath()); Files.deleteIfExists( - new File(tsFile.getAbsolutePath() + ModificationFileV1.FILE_SUFFIX).toPath()); + new File(LoadUtil.getTsFileModsV1Path(tsFile.getAbsolutePath())).toPath()); } } catch (final IOException e) { LOGGER.warn("Delete After Loading {} error.", tsFile, e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java index 470cf702b22..cedc4d5e29f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName; import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet; import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet; +import org.apache.iotdb.db.storageengine.load.util.LoadUtil; import org.apache.tsfile.common.conf.TSFileConfig; import org.apache.tsfile.external.commons.io.FileUtils; @@ -51,9 +52,6 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService { private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadDirScanner.class); - private static final String RESOURCE = ".resource"; - private static final String MODS = ".mods"; - private final AtomicReference<String[]> listeningDirsConfig = new AtomicReference<>(); private final Set<String> listeningDirs = new CopyOnWriteArraySet<>(); @@ -110,11 +108,7 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService { fileStream .filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file)) .filter(File::exists) - .map( - file -> - (file.getName().endsWith(RESOURCE) || file.getName().endsWith(MODS)) - ? getTsFilePath(file.getAbsolutePath()) - : file.getAbsolutePath()) + .map(file -> LoadUtil.getTsFilePath(file.getAbsolutePath())) .filter(this::isTsFileCompleted) .limit(currentAllowedPendingSize) .forEach( @@ -202,7 +196,7 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService { listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs()); listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs())); - ActiveLoadUtil.updateLoadDiskSelector(); + LoadUtil.updateLoadDiskSelector(); } } } @@ -235,14 +229,6 @@ public class ActiveLoadDirScanner extends ActiveLoadScheduledExecutorService { } } - private static String getTsFilePath(final String filePathWithResourceOrModsTail) { - return filePathWithResourceOrModsTail.endsWith(RESOURCE) - ? filePathWithResourceOrModsTail.substring( - 0, filePathWithResourceOrModsTail.length() - RESOURCE.length()) - : filePathWithResourceOrModsTail.substring( - 0, filePathWithResourceOrModsTail.length() - MODS.length()); - } - // Metrics public long countAndReportActiveListeningDirsFileNumber() { long totalFileCount = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java index 20817c94146..0e565afd70c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java @@ -40,6 +40,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet; import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet; +import org.apache.iotdb.db.storageengine.load.util.LoadUtil; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.external.commons.io.FileUtils; @@ -297,8 +298,9 @@ public class ActiveLoadTsFileLoader { private void removeFileAndResourceAndModsToFailDir(final String filePath) { removeToFailDir(filePath); - removeToFailDir(filePath + ".resource"); - removeToFailDir(filePath + ".mods"); + removeToFailDir(LoadUtil.getTsFileResourcePath(filePath)); + removeToFailDir(LoadUtil.getTsFileModsV1Path(filePath)); + removeToFailDir(LoadUtil.getTsFileModsV2Path(filePath)); } private void removeToFailDir(final String filePath) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java index 9a6be9737af..a011490ca49 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java @@ -27,9 +27,7 @@ import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTable import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile; import org.apache.iotdb.db.queryengine.plan.statement.Statement; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; -import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.util.LoadUtil; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.rpc.TSStatusCode; @@ -125,9 +123,9 @@ public class LoadTableStatementDataTypeConvertExecutionVisitor tsfile -> { FileUtils.deleteQuietly(tsfile); final String tsFilePath = tsfile.getAbsolutePath(); - FileUtils.deleteQuietly(new File(tsFilePath + TsFileResource.RESOURCE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFileV1.FILE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFile.FILE_SUFFIX)); + FileUtils.deleteQuietly(new File(LoadUtil.getTsFileResourcePath(tsFilePath))); + FileUtils.deleteQuietly(new File(LoadUtil.getTsFileModsV1Path(tsFilePath))); + FileUtils.deleteQuietly(new File(LoadUtil.getTsFileModsV2Path(tsFilePath))); }); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index 2999d436c95..226966454aa 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -29,11 +29,9 @@ import org.apache.iotdb.db.queryengine.plan.statement.StatementNode; import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; -import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; -import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; -import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock; import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager; +import org.apache.iotdb.db.storageengine.load.util.LoadUtil; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.tsfile.external.commons.io.FileUtils; @@ -178,9 +176,9 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor tsfile -> { FileUtils.deleteQuietly(tsfile); final String tsFilePath = tsfile.getAbsolutePath(); - FileUtils.deleteQuietly(new File(tsFilePath + TsFileResource.RESOURCE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFileV1.FILE_SUFFIX)); - FileUtils.deleteQuietly(new File(tsFilePath + ModificationFile.FILE_SUFFIX)); + FileUtils.deleteQuietly(new File(LoadUtil.getTsFileResourcePath(tsFilePath))); + FileUtils.deleteQuietly(new File(LoadUtil.getTsFileModsV1Path(tsFilePath))); + FileUtils.deleteQuietly(new File(LoadUtil.getTsFileModsV2Path(tsFilePath))); }); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java similarity index 74% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java index e3dbe43507d..a3d29337b86 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java @@ -17,11 +17,15 @@ * under the License. */ -package org.apache.iotdb.db.storageengine.load.active; +package org.apache.iotdb.db.storageengine.load.util; import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; +import org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper; import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; @@ -40,9 +44,9 @@ import java.util.Objects; import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check; import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check; -public class ActiveLoadUtil { +public class LoadUtil { - private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadUtil.class); + private static final Logger LOGGER = LoggerFactory.getLogger(LoadUtil.class); private static volatile ILoadDiskSelector loadDiskSelector = updateLoadDiskSelector(); @@ -68,6 +72,37 @@ public class ActiveLoadUtil { return true; } + public static String getTsFilePath(final String filePathWithResourceOrModsTail) { + if (filePathWithResourceOrModsTail.endsWith(TsFileResource.RESOURCE_SUFFIX)) { + return filePathWithResourceOrModsTail.substring( + 0, filePathWithResourceOrModsTail.length() - TsFileResource.RESOURCE_SUFFIX.length()); + } + + if (filePathWithResourceOrModsTail.endsWith(ModificationFileV1.FILE_SUFFIX)) { + return filePathWithResourceOrModsTail.substring( + 0, filePathWithResourceOrModsTail.length() - ModificationFileV1.FILE_SUFFIX.length()); + } + + if (filePathWithResourceOrModsTail.endsWith(ModificationFile.FILE_SUFFIX)) { + return filePathWithResourceOrModsTail.substring( + 0, filePathWithResourceOrModsTail.length() - ModificationFile.FILE_SUFFIX.length()); + } + + return filePathWithResourceOrModsTail; + } + + public static String getTsFileModsV1Path(final String tsFilePath) { + return tsFilePath + ModificationFileV1.FILE_SUFFIX; + } + + public static String getTsFileModsV2Path(final String tsFilePath) { + return tsFilePath + ModificationFile.FILE_SUFFIX; + } + + public static String getTsFileResourcePath(final String tsFilePath) { + return tsFilePath + TsFileResource.RESOURCE_SUFFIX; + } + private static boolean loadTsFilesToActiveDir( final Map<String, String> loadAttributes, final File file, final boolean isDeleteAfterLoad) throws IOException { @@ -93,9 +128,11 @@ public class ActiveLoadUtil { final File targetDir = ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes); loadTsFileAsyncToTargetDir( - targetDir, new File(file.getAbsolutePath() + ".resource"), isDeleteAfterLoad); + targetDir, new File(getTsFileResourcePath(file.getAbsolutePath())), isDeleteAfterLoad); + loadTsFileAsyncToTargetDir( + targetDir, new File(getTsFileModsV1Path(file.getAbsolutePath())), isDeleteAfterLoad); loadTsFileAsyncToTargetDir( - targetDir, new File(file.getAbsolutePath() + ".mods"), isDeleteAfterLoad); + targetDir, new File(getTsFileModsV2Path(file.getAbsolutePath())), isDeleteAfterLoad); loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad); return true; } @@ -182,7 +219,7 @@ public class ActiveLoadUtil { return new File(finalFolderManager.getNextFolder()); }); - ActiveLoadUtil.loadDiskSelector = loadDiskSelector; + LoadUtil.loadDiskSelector = loadDiskSelector; return loadDiskSelector; } }
