This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new f708b966238 Load: Active Load supports ModV2 (#16769)
f708b966238 is described below
commit f708b96623861172ed3a4cf5a6d29433565549ef
Author: Zhenyu Luo <[email protected]>
AuthorDate: Thu Nov 20 18:10:19 2025 +0800
Load: Active Load supports ModV2 (#16769)
---
.../protocol/thrift/IoTDBDataNodeReceiver.java | 4 +-
.../plan/analyze/load/LoadTsFileAnalyzer.java | 5 +--
.../plan/node/load/LoadSingleTsFileNode.java | 6 +--
.../load/active/ActiveLoadDirScanner.java | 20 ++-------
.../load/active/ActiveLoadTsFileLoader.java | 6 ++-
...leStatementDataTypeConvertExecutionVisitor.java | 10 ++---
...eeStatementDataTypeConvertExecutionVisitor.java | 10 ++---
.../ActiveLoadUtil.java => util/LoadUtil.java} | 49 +++++++++++++++++++---
8 files changed, 65 insertions(+), 45 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index 652530d11b4..4d4bf7b3926 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -101,7 +101,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
-import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
@@ -580,7 +580,7 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
null,
shouldMarkAsPipeRequest.get());
- if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths,
true)) {
+ if (!LoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) {
throw new PipeException("Load active listening pipe dir is not set.");
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 076b529ab91..8202de5c499 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -43,9 +43,9 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
-import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -291,8 +291,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
tabletConversionThresholdBytes,
isGeneratedByPipe);
- if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
- tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
+ if (LoadUtil.loadTsFileAsyncToActiveDir(tsFiles, activeLoadAttributes,
isDeleteAfterLoad)) {
analysis.setFinishQueryAfterAnalyze(true);
setRealStatement(analysis);
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index c8691868c82..604fda6e1e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -30,8 +30,8 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
import org.apache.tsfile.exception.NotImplementedException;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -229,10 +229,10 @@ public class LoadSingleTsFileNode extends WritePlanNode {
if (deleteAfterLoad) {
Files.deleteIfExists(tsFile.toPath());
Files.deleteIfExists(
- new File(tsFile.getAbsolutePath() +
TsFileResource.RESOURCE_SUFFIX).toPath());
+ new
File(LoadUtil.getTsFileResourcePath(tsFile.getAbsolutePath())).toPath());
Files.deleteIfExists(ModificationFile.getExclusiveMods(tsFile).toPath());
Files.deleteIfExists(
- new File(tsFile.getAbsolutePath() +
ModificationFileV1.FILE_SUFFIX).toPath());
+ new
File(LoadUtil.getTsFileModsV1Path(tsFile.getAbsolutePath())).toPath());
}
} catch (final IOException e) {
LOGGER.warn("Delete After Loading {} error.", tsFile, e);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index 470cf702b22..cedc4d5e29f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.external.commons.io.FileUtils;
@@ -51,9 +52,6 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadDirScanner.class);
- private static final String RESOURCE = ".resource";
- private static final String MODS = ".mods";
-
private final AtomicReference<String[]> listeningDirsConfig = new
AtomicReference<>();
private final Set<String> listeningDirs = new CopyOnWriteArraySet<>();
@@ -110,11 +108,7 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
fileStream
.filter(file ->
!activeLoadTsFileLoader.isFilePendingOrLoading(file))
.filter(File::exists)
- .map(
- file ->
- (file.getName().endsWith(RESOURCE) ||
file.getName().endsWith(MODS))
- ? getTsFilePath(file.getAbsolutePath())
- : file.getAbsolutePath())
+ .map(file -> LoadUtil.getTsFilePath(file.getAbsolutePath()))
.filter(this::isTsFileCompleted)
.limit(currentAllowedPendingSize)
.forEach(
@@ -202,7 +196,7 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs());
listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs()));
- ActiveLoadUtil.updateLoadDiskSelector();
+ LoadUtil.updateLoadDiskSelector();
}
}
}
@@ -235,14 +229,6 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
}
}
- private static String getTsFilePath(final String
filePathWithResourceOrModsTail) {
- return filePathWithResourceOrModsTail.endsWith(RESOURCE)
- ? filePathWithResourceOrModsTail.substring(
- 0, filePathWithResourceOrModsTail.length() - RESOURCE.length())
- : filePathWithResourceOrModsTail.substring(
- 0, filePathWithResourceOrModsTail.length() - MODS.length());
- }
-
// Metrics
public long countAndReportActiveListeningDirsFileNumber() {
long totalFileCount = 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
index 20817c94146..0e565afd70c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
import
org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesSizeMetricsSet;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.external.commons.io.FileUtils;
@@ -297,8 +298,9 @@ public class ActiveLoadTsFileLoader {
private void removeFileAndResourceAndModsToFailDir(final String filePath) {
removeToFailDir(filePath);
- removeToFailDir(filePath + ".resource");
- removeToFailDir(filePath + ".mods");
+ removeToFailDir(LoadUtil.getTsFileResourcePath(filePath));
+ removeToFailDir(LoadUtil.getTsFileModsV1Path(filePath));
+ removeToFailDir(LoadUtil.getTsFileModsV2Path(filePath));
}
private void removeToFailDir(final String filePath) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
index 9a6be9737af..a011490ca49 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
@@ -27,9 +27,7 @@ import
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTable
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LoadTsFile;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -125,9 +123,9 @@ public class
LoadTableStatementDataTypeConvertExecutionVisitor
tsfile -> {
FileUtils.deleteQuietly(tsfile);
final String tsFilePath = tsfile.getAbsolutePath();
- FileUtils.deleteQuietly(new File(tsFilePath +
TsFileResource.RESOURCE_SUFFIX));
- FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFileV1.FILE_SUFFIX));
- FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFile.FILE_SUFFIX));
+ FileUtils.deleteQuietly(new
File(LoadUtil.getTsFileResourcePath(tsFilePath)));
+ FileUtils.deleteQuietly(new
File(LoadUtil.getTsFileModsV1Path(tsFilePath)));
+ FileUtils.deleteQuietly(new
File(LoadUtil.getTsFileModsV2Path(tsFilePath)));
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index 2999d436c95..226966454aa 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -29,11 +29,9 @@ import
org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
-import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
-import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
+import org.apache.iotdb.db.storageengine.load.util.LoadUtil;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.external.commons.io.FileUtils;
@@ -178,9 +176,9 @@ public class
LoadTreeStatementDataTypeConvertExecutionVisitor
tsfile -> {
FileUtils.deleteQuietly(tsfile);
final String tsFilePath = tsfile.getAbsolutePath();
- FileUtils.deleteQuietly(new File(tsFilePath +
TsFileResource.RESOURCE_SUFFIX));
- FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFileV1.FILE_SUFFIX));
- FileUtils.deleteQuietly(new File(tsFilePath +
ModificationFile.FILE_SUFFIX));
+ FileUtils.deleteQuietly(new
File(LoadUtil.getTsFileResourcePath(tsFilePath)));
+ FileUtils.deleteQuietly(new
File(LoadUtil.getTsFileModsV1Path(tsFilePath)));
+ FileUtils.deleteQuietly(new
File(LoadUtil.getTsFileModsV2Path(tsFilePath)));
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java
similarity index 74%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java
index e3dbe43507d..a3d29337b86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/util/LoadUtil.java
@@ -17,11 +17,15 @@
* under the License.
*/
-package org.apache.iotdb.db.storageengine.load.active;
+package org.apache.iotdb.db.storageengine.load.util;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
+import
org.apache.iotdb.db.storageengine.dataregion.modification.v1.ModificationFileV1;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -40,9 +44,9 @@ import java.util.Objects;
import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
-public class ActiveLoadUtil {
+public class LoadUtil {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadUtil.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(LoadUtil.class);
private static volatile ILoadDiskSelector loadDiskSelector =
updateLoadDiskSelector();
@@ -68,6 +72,37 @@ public class ActiveLoadUtil {
return true;
}
+ public static String getTsFilePath(final String
filePathWithResourceOrModsTail) {
+ if
(filePathWithResourceOrModsTail.endsWith(TsFileResource.RESOURCE_SUFFIX)) {
+ return filePathWithResourceOrModsTail.substring(
+ 0, filePathWithResourceOrModsTail.length() -
TsFileResource.RESOURCE_SUFFIX.length());
+ }
+
+ if
(filePathWithResourceOrModsTail.endsWith(ModificationFileV1.FILE_SUFFIX)) {
+ return filePathWithResourceOrModsTail.substring(
+ 0, filePathWithResourceOrModsTail.length() -
ModificationFileV1.FILE_SUFFIX.length());
+ }
+
+ if (filePathWithResourceOrModsTail.endsWith(ModificationFile.FILE_SUFFIX))
{
+ return filePathWithResourceOrModsTail.substring(
+ 0, filePathWithResourceOrModsTail.length() -
ModificationFile.FILE_SUFFIX.length());
+ }
+
+ return filePathWithResourceOrModsTail;
+ }
+
+ public static String getTsFileModsV1Path(final String tsFilePath) {
+ return tsFilePath + ModificationFileV1.FILE_SUFFIX;
+ }
+
+ public static String getTsFileModsV2Path(final String tsFilePath) {
+ return tsFilePath + ModificationFile.FILE_SUFFIX;
+ }
+
+ public static String getTsFileResourcePath(final String tsFilePath) {
+ return tsFilePath + TsFileResource.RESOURCE_SUFFIX;
+ }
+
private static boolean loadTsFilesToActiveDir(
final Map<String, String> loadAttributes, final File file, final boolean
isDeleteAfterLoad)
throws IOException {
@@ -93,9 +128,11 @@ public class ActiveLoadUtil {
final File targetDir =
ActiveLoadPathHelper.resolveTargetDir(targetFilePath, attributes);
loadTsFileAsyncToTargetDir(
- targetDir, new File(file.getAbsolutePath() + ".resource"),
isDeleteAfterLoad);
+ targetDir, new File(getTsFileResourcePath(file.getAbsolutePath())),
isDeleteAfterLoad);
+ loadTsFileAsyncToTargetDir(
+ targetDir, new File(getTsFileModsV1Path(file.getAbsolutePath())),
isDeleteAfterLoad);
loadTsFileAsyncToTargetDir(
- targetDir, new File(file.getAbsolutePath() + ".mods"),
isDeleteAfterLoad);
+ targetDir, new File(getTsFileModsV2Path(file.getAbsolutePath())),
isDeleteAfterLoad);
loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad);
return true;
}
@@ -182,7 +219,7 @@ public class ActiveLoadUtil {
return new File(finalFolderManager.getNextFolder());
});
- ActiveLoadUtil.loadDiskSelector = loadDiskSelector;
+ LoadUtil.loadDiskSelector = loadDiskSelector;
return loadDiskSelector;
}
}