This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch force_ci/object_type
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fe49c5ee0806f57a30cfff5c86d61fb653a51203
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Nov 20 18:10:19 2025 +0800

    Load: Active Load supports ModV2 (#16769)
    
    (cherry picked from commit f708b96623861172ed3a4cf5a6d29433565549ef)
---
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  4 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  5 +--
 .../plan/node/load/LoadSingleTsFileNode.java       |  6 +--
 .../load/active/ActiveLoadDirScanner.java          | 20 ++-------
 .../load/active/ActiveLoadTsFileLoader.java        |  6 ++-
 ...leStatementDataTypeConvertExecutionVisitor.java | 10 ++---
 ...eeStatementDataTypeConvertExecutionVisitor.java | 10 ++---
 .../ActiveLoadUtil.java => util/LoadUtil.java}     | 49 +++++++++++++++++++---
 8 files changed, 65 insertions(+), 45 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 652530d11b4..4d4bf7b3926 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -101,7 +101,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
-import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
@@ -580,7 +580,7 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
             null,
             shouldMarkAsPipeRequest.get());
 
-    if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, 
true)) {
+    if (!LoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) {
       throw new PipeException("Load active listening pipe dir is not set.");
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 076b529ab91..8202de5c499 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -43,9 +43,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
 import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
-import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
 import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
 import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -291,8 +291,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
               tabletConversionThresholdBytes,
               isGeneratedByPipe);
 
-      if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
-          tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
+      if (LoadUtil.loadTsFileAsyncToActiveDir(tsFiles, activeLoadAttributes, 
isDeleteAfterLoad)) {
         analysis.setFinishQueryAfterAnalyze(true);
         setRealStatement(analysis);
         return true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index c8691868c82..604fda6e1e8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -30,8 +30,8 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
 import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
 
 import org.apache.tsfile.exception.NotImplementedException;
 import org.apache.tsfile.file.metadata.IDeviceID;
@@ -229,10 +229,10 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       if (deleteAfterLoad) {
         Files.deleteIfExists(tsFile.toPath());
         Files.deleteIfExists(
-            new File(tsFile.getAbsolutePath() + 
TsFileResource.RESOURCE_SUFFIX).toPath());
+            new 
File(LoadUtil.getTsFileResourcePath(tsFile.getAbsolutePath())).toPath());
         
Files.deleteIfExists(ModificationFile.getExclusiveMods(tsFile).toPath());
         Files.deleteIfExists(
-            new File(tsFile.getAbsolutePath() + 
ModificationFileV1.FILE_SUFFIX).toPath());
+            new 
File(LoadUtil.getTsFileModsV1Path(tsFile.getAbsolutePath())).toPath());
       }
     } catch (final IOException e) {
       LOGGER.warn("Delete After Loading {} error.", tsFile, e);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 470cf702b22..cedc4d5e29f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
 import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
 import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.external.commons.io.FileUtils;
@@ -51,9 +52,6 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
 
-  private static final String RESOURCE = ".resource";
-  private static final String MODS = ".mods";
-
   private final AtomicReference<String[]> listeningDirsConfig = new 
AtomicReference<>();
   private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
 
@@ -110,11 +108,7 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
           fileStream
               .filter(file -> 
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
               .filter(File::exists)
-              .map(
-                  file ->
-                      (file.getName().endsWith(RESOURCE) || 
file.getName().endsWith(MODS))
-                          ? getTsFilePath(file.getAbsolutePath())
-                          : file.getAbsolutePath())
+              .map(file -> LoadUtil.getTsFilePath(file.getAbsolutePath()))
               .filter(this::isTsFileCompleted)
               .limit(currentAllowedPendingSize)
               .forEach(
@@ -202,7 +196,7 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
 
               
listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs());
               
listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs()));
-              ActiveLoadUtil.updateLoadDiskSelector();
+              LoadUtil.updateLoadDiskSelector();
             }
           }
         }
@@ -235,14 +229,6 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
     }
   }
 
-  private static String getTsFilePath(final String 
filePathWithResourceOrModsTail) {
-    return filePathWithResourceOrModsTail.endsWith(RESOURCE)
-        ? filePathWithResourceOrModsTail.substring(
-            0, filePathWithResourceOrModsTail.length() - RESOURCE.length())
-        : filePathWithResourceOrModsTail.substring(
-            0, filePathWithResourceOrModsTail.length() - MODS.length());
-  }
-
   // Metrics
   public long countAndReportActiveListeningDirsFileNumber() {
     long totalFileCount = 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 20817c94146..0e565afd70c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -40,6 +40,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
 import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
 import 
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.external.commons.io.FileUtils;
@@ -297,8 +298,9 @@ public class ActiveLoadTsFileLoader {
 
   private void removeFileAndResourceAndModsToFailDir(final String filePath) {
     removeToFailDir(filePath);
-    removeToFailDir(filePath + ".resource");
-    removeToFailDir(filePath + ".mods");
+    removeToFailDir(LoadUtil.getTsFileResourcePath(filePath));
+    removeToFailDir(LoadUtil.getTsFileModsV1Path(filePath));
+    removeToFailDir(LoadUtil.getTsFileModsV2Path(filePath));
   }
 
   private void removeToFailDir(final String filePath) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
index 9a6be9737af..a011490ca49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
@@ -27,9 +27,7 @@ import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTable
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -125,9 +123,9 @@ public class 
LoadTableStatementDataTypeConvertExecutionVisitor
               tsfile -> {
                 FileUtils.deleteQuietly(tsfile);
                 final String tsFilePath = tsfile.getAbsolutePath();
-                FileUtils.deleteQuietly(new File(tsFilePath + 
TsFileResource.RESOURCE_SUFFIX));
-                FileUtils.deleteQuietly(new File(tsFilePath + 
ModificationFileV1.FILE_SUFFIX));
-                FileUtils.deleteQuietly(new File(tsFilePath + 
ModificationFile.FILE_SUFFIX));
+                FileUtils.deleteQuietly(new 
File(LoadUtil.getTsFileResourcePath(tsFilePath)));
+                FileUtils.deleteQuietly(new 
File(LoadUtil.getTsFileModsV1Path(tsFilePath)));
+                FileUtils.deleteQuietly(new 
File(LoadUtil.getTsFileModsV2Path(tsFilePath)));
               });
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index 2999d436c95..226966454aa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -29,11 +29,9 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import 
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
 import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.tsfile.external.commons.io.FileUtils;
@@ -178,9 +176,9 @@ public class 
LoadTreeStatementDataTypeConvertExecutionVisitor
               tsfile -> {
                 FileUtils.deleteQuietly(tsfile);
                 final String tsFilePath = tsfile.getAbsolutePath();
-                FileUtils.deleteQuietly(new File(tsFilePath + 
TsFileResource.RESOURCE_SUFFIX));
-                FileUtils.deleteQuietly(new File(tsFilePath + 
ModificationFileV1.FILE_SUFFIX));
-                FileUtils.deleteQuietly(new File(tsFilePath + 
ModificationFile.FILE_SUFFIX));
+                FileUtils.deleteQuietly(new 
File(LoadUtil.getTsFileResourcePath(tsFilePath)));
+                FileUtils.deleteQuietly(new 
File(LoadUtil.getTsFileModsV1Path(tsFilePath)));
+                FileUtils.deleteQuietly(new 
File(LoadUtil.getTsFileModsV2Path(tsFilePath)));
               });
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java
similarity index 74%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java
index e3dbe43507d..a3d29337b86 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java
@@ -17,11 +17,15 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.storageengine.load.active;
+package org.apache.iotdb.db.storageengine.load.util;
 
 import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import 
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
 import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -40,9 +44,9 @@ import java.util.Objects;
 import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
 import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
 
-public class ActiveLoadUtil {
+public class LoadUtil {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadUtil.class);
+  private static final Logger LOGGER = LoggerFactory.getLogger(LoadUtil.class);
 
   private static volatile ILoadDiskSelector loadDiskSelector = 
updateLoadDiskSelector();
 
@@ -68,6 +72,37 @@ public class ActiveLoadUtil {
     return true;
   }
 
+  public static String getTsFilePath(final String 
filePathWithResourceOrModsTail) {
+    if 
(filePathWithResourceOrModsTail.endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+      return filePathWithResourceOrModsTail.substring(
+          0, filePathWithResourceOrModsTail.length() - 
TsFileResource.RESOURCE_SUFFIX.length());
+    }
+
+    if 
(filePathWithResourceOrModsTail.endsWith(ModificationFileV1.FILE_SUFFIX)) {
+      return filePathWithResourceOrModsTail.substring(
+          0, filePathWithResourceOrModsTail.length() - 
ModificationFileV1.FILE_SUFFIX.length());
+    }
+
+    if (filePathWithResourceOrModsTail.endsWith(ModificationFile.FILE_SUFFIX)) 
{
+      return filePathWithResourceOrModsTail.substring(
+          0, filePathWithResourceOrModsTail.length() - 
ModificationFile.FILE_SUFFIX.length());
+    }
+
+    return filePathWithResourceOrModsTail;
+  }
+
+  public static String getTsFileModsV1Path(final String tsFilePath) {
+    return tsFilePath + ModificationFileV1.FILE_SUFFIX;
+  }
+
+  public static String getTsFileModsV2Path(final String tsFilePath) {
+    return tsFilePath + ModificationFile.FILE_SUFFIX;
+  }
+
+  public static String getTsFileResourcePath(final String tsFilePath) {
+    return tsFilePath + TsFileResource.RESOURCE_SUFFIX;
+  }
+
   private static boolean loadTsFilesToActiveDir(
       final Map<String, String> loadAttributes, final File file, final boolean 
isDeleteAfterLoad)
       throws IOException {
@@ -93,9 +128,11 @@ public class ActiveLoadUtil {
     final File targetDir = 
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
 
     loadTsFileAsyncToTargetDir(
-        targetDir, new File(file.getAbsolutePath() + ".resource"), 
isDeleteAfterLoad);
+        targetDir, new File(getTsFileResourcePath(file.getAbsolutePath())), 
isDeleteAfterLoad);
+    loadTsFileAsyncToTargetDir(
+        targetDir, new File(getTsFileModsV1Path(file.getAbsolutePath())), 
isDeleteAfterLoad);
     loadTsFileAsyncToTargetDir(
-        targetDir, new File(file.getAbsolutePath() + ".mods"), 
isDeleteAfterLoad);
+        targetDir, new File(getTsFileModsV2Path(file.getAbsolutePath())), 
isDeleteAfterLoad);
     loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad);
     return true;
   }
@@ -182,7 +219,7 @@ public class ActiveLoadUtil {
               return new File(finalFolderManager.getNextFolder());
             });
 
-    ActiveLoadUtil.loadDiskSelector = loadDiskSelector;
+    LoadUtil.loadDiskSelector = loadDiskSelector;
     return loadDiskSelector;
   }
 }

Reply via email to