This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 432e0273261 Pipe/Load: Implement multi-disk awareness of multiple file
systems during file copying and moving (#15356)
432e0273261 is described below
commit 432e0273261d3fc5b42cf660cfc219457bc3377c
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jul 28 17:51:53 2025 +0800
Pipe/Load: Implement multi-disk awareness of multiple file systems during
file copying and moving (#15356)
* Pipe/Load: Implement multi-disk awareness of multiple file systems during
file copying and moving
* spotless
* fix
* fix
* update ActiveLoadUtil
* update InheritSystemMultiDisksStrategySelector
* update ILoadDisksSelector
* update ActiveLoadUtil
* fix
* fix
* modify the code based on review comments
* modify the code based on review comments
* spotless
* fix
* fix
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 6 +-
.../protocol/thrift/IoTDBDataNodeReceiver.java | 32 +---
.../plan/analyze/load/LoadTsFileAnalyzer.java | 88 ++--------
.../db/storageengine/dataregion/DataRegion.java | 72 ++++----
.../load/active/ActiveLoadDirScanner.java | 1 +
.../storageengine/load/active/ActiveLoadUtil.java | 183 +++++++++++++++++++++
.../storageengine/load/disk/ILoadDiskSelector.java | 36 +++-
.../InheritSystemMultiDisksStrategySelector.java | 50 ++----
.../db/storageengine/load/disk/MinIOSelector.java | 62 ++++---
9 files changed, 311 insertions(+), 219 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b751c150585..30c343ba358 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -3912,7 +3912,11 @@ public class IoTDBConfig {
}
public String getLoadDiskSelectStrategyForIoTV2AndPipe() {
- return loadDiskSelectStrategyForIoTV2AndPipe;
+ return LoadDiskSelectorType.INHERIT_LOAD
+ .getValue()
+ .equals(loadDiskSelectStrategyForIoTV2AndPipe)
+ ? getLoadDiskSelectStrategy()
+ : loadDiskSelectStrategyForIoTV2AndPipe;
}
public void setLoadDiskSelectStrategyForIoTV2AndPipe(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c899eca9435..9220d236c82 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -37,8 +37,6 @@ import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFil
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
-import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -95,6 +93,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
@@ -110,7 +109,6 @@ import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
@@ -567,35 +565,9 @@ public class IoTDBDataNodeReceiver extends
IoTDBFileReceiver {
private TSStatus loadTsFileAsync(final String dataBaseName, final
List<String> absolutePaths)
throws IOException {
- final String loadActiveListeningPipeDir =
IOTDB_CONFIG.getLoadActiveListeningPipeDir();
- if (Objects.isNull(loadActiveListeningPipeDir)) {
+ if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths,
true)) {
throw new PipeException("Load active listening pipe dir is not set.");
}
-
- if (Objects.nonNull(dataBaseName)) {
- final File targetDir = new File(loadActiveListeningPipeDir,
dataBaseName);
- return this.loadTsFileAsyncToTargetDir(targetDir, absolutePaths);
- }
-
- return loadTsFileAsyncToTargetDir(new File(loadActiveListeningPipeDir),
absolutePaths);
- }
-
- private TSStatus loadTsFileAsyncToTargetDir(
- final File targetDir, final List<String> absolutePaths) throws
IOException {
- for (final String absolutePath : absolutePaths) {
- if (absolutePath == null) {
- continue;
- }
- final File sourceFile = new File(absolutePath);
- if (!Objects.equals(
- targetDir.getAbsolutePath(),
sourceFile.getParentFile().getAbsolutePath())) {
- RetryUtils.retryOnException(
- () -> {
- FileUtils.moveFileWithMD5Check(sourceFile, targetDir);
- return null;
- });
- }
- }
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 8ec6bd7bb48..0f90d7c8724 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
@@ -45,6 +44,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
@@ -75,8 +75,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
-import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
import static
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
import static
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS_ASYNC_MOVE;
@@ -276,83 +274,29 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
private boolean doAsyncLoad(final IAnalysis analysis) {
final long startTime = System.nanoTime();
try {
- final String[] loadActiveListeningDirs =
-
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
- String targetFilePath = null;
- for (int i = 0, size = loadActiveListeningDirs == null ? 0 :
loadActiveListeningDirs.length;
- i < size;
- i++) {
- if (loadActiveListeningDirs[i] != null) {
- targetFilePath = loadActiveListeningDirs[i];
- break;
- }
- }
- if (targetFilePath == null) {
- LOGGER.warn("Load active listening dir is not set. Will try sync load
instead.");
- return false;
+ final String databaseName;
+ if (Objects.nonNull(databaseForTableData)
+ || (Objects.nonNull(context) &&
context.getDatabaseName().isPresent())) {
+ databaseName =
+ Objects.nonNull(databaseForTableData)
+ ? databaseForTableData
+ : context.getDatabaseName().get();
+ } else {
+ databaseName = null;
}
-
- try {
- if (Objects.nonNull(databaseForTableData)
- || (Objects.nonNull(context) &&
context.getDatabaseName().isPresent())) {
- loadTsFilesAsyncToTargetDir(
- new File(
- targetFilePath,
- databaseForTableData =
- Objects.nonNull(databaseForTableData)
- ? databaseForTableData
- : context.getDatabaseName().get()),
- tsFiles);
- } else {
- loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
- }
- } catch (Exception e) {
- LOGGER.warn(
- "Failed to async load tsfiles {} to target dir {}. Will try sync
load instead.",
- tsFiles,
- targetFilePath,
- e);
- return false;
+ if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, databaseName,
isDeleteAfterLoad)) {
+ analysis.setFinishQueryAfterAnalyze(true);
+ setRealStatement(analysis);
+ return true;
}
-
- analysis.setFinishQueryAfterAnalyze(true);
- setRealStatement(analysis);
- return true;
+ LOGGER.info("Async Load has failed, and is now trying to load sync");
+ return false;
} finally {
LoadTsFileCostMetricsSet.getInstance()
.recordPhaseTimeCost(ANALYSIS_ASYNC_MOVE, System.nanoTime() -
startTime);
}
}
- private void loadTsFilesAsyncToTargetDir(final File targetDir, final
List<File> files)
- throws IOException {
- for (final File file : files) {
- if (file == null) {
- continue;
- }
-
- loadTsFileAsyncToTargetDir(targetDir, file);
- loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() +
".resource"));
- loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() +
".mods"));
- }
- }
-
- private void loadTsFileAsyncToTargetDir(final File targetDir, final File
file)
- throws IOException {
- if (!file.exists()) {
- return;
- }
- RetryUtils.retryOnException(
- () -> {
- if (isDeleteAfterLoad) {
- moveFileWithMD5Check(file, targetDir);
- } else {
- copyFileWithMD5Check(file, targetDir);
- }
- return null;
- });
- }
-
private boolean doAnalyzeFileByFile(IAnalysis analysis) {
// analyze tsfile metadata file by file
for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index fdfcd18c64a..d0cd62deacc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -137,8 +138,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
-import
org.apache.iotdb.db.storageengine.load.disk.InheritSystemMultiDisksStrategySelector;
-import org.apache.iotdb.db.storageengine.load.disk.MinIOSelector;
import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -430,27 +429,30 @@ public class DataRegion implements IDataRegionForQuery {
}
private void initDiskSelector() {
- switch
(ILoadDiskSelector.LoadDiskSelectorType.fromValue(config.getLoadDiskSelectStrategy()))
{
- case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
- ordinaryLoadDiskSelector = new
InheritSystemMultiDisksStrategySelector();
- break;
- case MIN_IO_FIRST:
- default:
- ordinaryLoadDiskSelector = new MinIOSelector();
- }
+ final ILoadDiskSelector.DiskDirectorySelector selector =
+ (sourceDirectory, fileName, tierLevel) -> {
+ try {
+ return TierManager.getInstance()
+ .getFolderManager(tierLevel, false)
+ .getNextWithRetry(folder -> fsFactory.getFile(folder,
fileName));
+ } catch (DiskSpaceInsufficientException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new LoadFileException(
+ String.format("Storage allocation failed for %s (tier %d)",
fileName, tierLevel),
+ e);
+ }
+ };
- switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(
- config.getLoadDiskSelectStrategyForIoTV2AndPipe())) {
- case MIN_IO_FIRST:
- pipeAndIoTV2LoadDiskSelector = new MinIOSelector();
- break;
- case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
- pipeAndIoTV2LoadDiskSelector = new
InheritSystemMultiDisksStrategySelector();
- break;
- case INHERIT_LOAD:
- default:
- pipeAndIoTV2LoadDiskSelector = ordinaryLoadDiskSelector;
- }
+ final String[] dirs =
+ Arrays.stream(config.getTierDataDirs()[0])
+ .map(v -> fsFactory.getFile(v,
IoTDBConstant.UNSEQUENCE_FOLDER_NAME).getPath())
+ .toArray(String[]::new);
+ ordinaryLoadDiskSelector =
+ ILoadDiskSelector.initDiskSelector(config.getLoadDiskSelectStrategy(),
dirs, selector);
+ pipeAndIoTV2LoadDiskSelector =
+ ILoadDiskSelector.initDiskSelector(
+ config.getLoadDiskSelectStrategyForIoTV2AndPipe(), dirs, selector);
}
@Override
@@ -3274,22 +3276,20 @@ public class DataRegion implements IDataRegionForQuery {
boolean isGeneratedByPipe)
throws LoadFileException, DiskSpaceInsufficientException {
final int targetTierLevel = 0;
+ final String fileName =
+ databaseName
+ + File.separatorChar
+ + dataRegionId
+ + File.separatorChar
+ + filePartitionId
+ + File.separator
+ + tsFileResource.getTsFile().getName();
final File targetFile =
(tsFileResource.isGeneratedByPipeConsensus() ||
tsFileResource.isGeneratedByPipe())
- ? pipeAndIoTV2LoadDiskSelector.getTargetFile(
- tsFileToLoad,
- databaseName,
- dataRegionId,
- filePartitionId,
- tsFileResource.getTsFile().getName(),
- targetTierLevel)
- : ordinaryLoadDiskSelector.getTargetFile(
- tsFileToLoad,
- databaseName,
- dataRegionId,
- filePartitionId,
- tsFileResource.getTsFile().getName(),
- targetTierLevel);
+ ? pipeAndIoTV2LoadDiskSelector.selectTargetDirectory(
+ tsFileToLoad.getParentFile(), fileName, true, targetTierLevel)
+ : ordinaryLoadDiskSelector.selectTargetDirectory(
+ tsFileToLoad.getParentFile(), fileName, true, targetTierLevel);
tsFileResource.setFile(targetFile);
if (tsFileManager.contains(tsFileResource, false)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index b720b2be6b0..27b36c20793 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -188,6 +188,7 @@ public class ActiveLoadDirScanner extends
ActiveLoadScheduledExecutorService {
listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs());
listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs()));
+ ActiveLoadUtil.updateLoadDiskSelector();
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
new file mode 100644
index 00000000000..5bfa9b71105
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.commons.utils.RetryUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
+import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
+import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
+import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
+
+public class ActiveLoadUtil {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ActiveLoadUtil.class);
+
+ private static volatile ILoadDiskSelector loadDiskSelector =
updateLoadDiskSelector();
+
+ public static boolean loadTsFileAsyncToActiveDir(
+ final List<File> tsFiles, final String dataBaseName, final boolean
isDeleteAfterLoad) {
+ if (tsFiles == null || tsFiles.isEmpty()) {
+ return true;
+ }
+
+ try {
+ for (File file : tsFiles) {
+ if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) {
+ return false;
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Fail to load tsfile to Active dir", e);
+ return false;
+ }
+
+ return true;
+ }
+
+ private static boolean loadTsFilesToActiveDir(
+ final String dataBaseName, final File file, final boolean
isDeleteAfterLoad)
+ throws IOException {
+ if (file == null) {
+ return true;
+ }
+
+ final File targetFilePath;
+ try {
+ targetFilePath =
+ loadDiskSelector.selectTargetDirectory(file.getParentFile(),
file.getName(), false, 0);
+ } catch (Exception e) {
+ LOGGER.warn("Fail to load disk space of file {}",
file.getAbsolutePath(), e);
+ return false;
+ }
+
+ if (targetFilePath == null) {
+ LOGGER.warn("Load active listening dir is not set.");
+ return false;
+ }
+ final File targetDir;
+ if (Objects.nonNull(dataBaseName)) {
+ targetDir = new File(targetFilePath, dataBaseName);
+ } else {
+ targetDir = targetFilePath;
+ }
+
+ loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad);
+ loadTsFileAsyncToTargetDir(
+ targetDir, new File(file.getAbsolutePath() + ".resource"),
isDeleteAfterLoad);
+ loadTsFileAsyncToTargetDir(
+ targetDir, new File(file.getAbsolutePath() + ".mods"),
isDeleteAfterLoad);
+ return true;
+ }
+
+ public static boolean loadFilesToActiveDir(
+ final String dataBaseName, final List<String> files, final boolean
isDeleteAfterLoad)
+ throws IOException {
+ if (files == null || files.isEmpty()) {
+ return true;
+ }
+
+ final File targetFilePath;
+ try {
+ final File file = new File(files.get(0));
+ targetFilePath =
+ loadDiskSelector.selectTargetDirectory(file.getParentFile(),
file.getName(), false, 0);
+ } catch (Exception e) {
+ LOGGER.warn("Fail to load disk space of file {}", files.get(0), e);
+ return false;
+ }
+
+ if (targetFilePath == null) {
+ LOGGER.warn("Load active listening dir is not set.");
+ return false;
+ }
+ final File targetDir;
+ if (Objects.nonNull(dataBaseName)) {
+ targetDir = new File(targetFilePath, dataBaseName);
+ } else {
+ targetDir = targetFilePath;
+ }
+
+ for (final String file : files) {
+ loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad);
+ }
+ return true;
+ }
+
+ private static void loadTsFileAsyncToTargetDir(
+ final File targetDir, final File file, final boolean isDeleteAfterLoad)
throws IOException {
+ if (!file.exists()) {
+ return;
+ }
+ RetryUtils.retryOnException(
+ () -> {
+ if (isDeleteAfterLoad) {
+ moveFileWithMD5Check(file, targetDir);
+ } else {
+ copyFileWithMD5Check(file, targetDir);
+ }
+ return null;
+ });
+ }
+
+ public static ILoadDiskSelector updateLoadDiskSelector() {
+ final String[] dirs =
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+ FolderManager folderManager = null;
+ DiskSpaceInsufficientException exception = null;
+
+ try {
+ folderManager =
+ new FolderManager(Arrays.asList(dirs),
DirectoryStrategyType.SEQUENCE_STRATEGY);
+ } catch (DiskSpaceInsufficientException e) {
+ // It should be noted that if this exception is not ignored, the entire
process may fail to
+ // start.
+ exception = e;
+ LOGGER.warn("Failed to load active listening dirs", e);
+ }
+
+ final FolderManager finalFolderManager = folderManager;
+ final DiskSpaceInsufficientException finalException = exception;
+ ILoadDiskSelector loadDiskSelector =
+ ILoadDiskSelector.initDiskSelector(
+
IoTDBDescriptor.getInstance().getConfig().getLoadDiskSelectStrategy(),
+ dirs,
+ (sourceDir, fileName, tierLevel) -> {
+ if (finalException != null) {
+ throw finalException;
+ }
+ return new File(finalFolderManager.getNextFolder());
+ });
+
+ ActiveLoadUtil.loadDiskSelector = loadDiskSelector;
+ return loadDiskSelector;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
index 269228e9fe8..e9216e9b555 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
@@ -26,15 +26,33 @@ import java.io.File;
public interface ILoadDiskSelector {
- File getTargetFile(
- File fileToLoad,
- String databaseName,
- String dataRegionId,
- long filePartitionId,
- String tsfileName,
- int tierLevel)
+ @FunctionalInterface
+ interface DiskDirectorySelector {
+ File selectDirectory(final File sourceDirectory, final String fileName,
final int tierLevel)
+ throws DiskSpaceInsufficientException, LoadFileException;
+ }
+
+ File selectTargetDirectory(
+ final File sourceDirectory,
+ final String fileName,
+ final boolean appendFileName,
+ final int tierLevel)
throws DiskSpaceInsufficientException, LoadFileException;
+ static ILoadDiskSelector initDiskSelector(
+ final String selectStrategy, final String[] dirs, final
DiskDirectorySelector selector) {
+ final ILoadDiskSelector diskSelector;
+ switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(selectStrategy)) {
+ case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
+ diskSelector = new InheritSystemMultiDisksStrategySelector(selector);
+ break;
+ case MIN_IO_FIRST:
+ default:
+ diskSelector = new MinIOSelector(dirs, selector);
+ }
+ return diskSelector;
+ }
+
enum LoadDiskSelectorType {
MIN_IO_FIRST("MIN_IO_FIRST"),
INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY("INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY"),
@@ -44,7 +62,7 @@ public interface ILoadDiskSelector {
private final String value;
- LoadDiskSelectorType(String value) {
+ LoadDiskSelectorType(final String value) {
this.value = value;
}
@@ -52,7 +70,7 @@ public interface ILoadDiskSelector {
return value;
}
- public static LoadDiskSelectorType fromValue(String value) {
+ public static LoadDiskSelectorType fromValue(final String value) {
if (value.equalsIgnoreCase(MIN_IO_FIRST.getValue())) {
return MIN_IO_FIRST;
} else if
(value.equalsIgnoreCase(INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY.getValue()))
{
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
index 7739c42a538..c56addc4bf9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
@@ -21,56 +21,28 @@ package org.apache.iotdb.db.storageengine.load.disk;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.load.LoadFileException;
-import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.tsfile.fileSystem.FSFactoryProducer;
import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.File;
public class InheritSystemMultiDisksStrategySelector implements
ILoadDiskSelector {
- protected final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
- private static final Logger logger =
- LoggerFactory.getLogger(InheritSystemMultiDisksStrategySelector.class);
+ protected static final FSFactory fsFactory =
FSFactoryProducer.getFSFactory();
- public InheritSystemMultiDisksStrategySelector() {
- // empty body
+ protected final DiskDirectorySelector directorySelector;
+
+ public InheritSystemMultiDisksStrategySelector(final DiskDirectorySelector
selector) {
+ this.directorySelector = selector;
}
- public File getTargetFile(
- File fileToLoad,
- String databaseName,
- String dataRegionId,
- long filePartitionId,
- String tsfileName,
- int tierLevel)
+ public File selectTargetDirectory(
+ final File sourceDirectory,
+ final String FileName,
+ final boolean appendFileName,
+ final int tierLevel)
throws DiskSpaceInsufficientException, LoadFileException {
- try {
- return TierManager.getInstance()
- .getFolderManager(tierLevel, false)
- .getNextWithRetry(
- folder -> {
- return fsFactory.getFile(
- folder,
- databaseName
- + File.separatorChar
- + dataRegionId
- + File.separatorChar
- + filePartitionId
- + File.separator
- + tsfileName);
- });
- } catch (DiskSpaceInsufficientException e) {
- throw e;
- } catch (Exception e) {
- throw new LoadFileException(
- String.format(
- "Storage allocation failed for %s/%s/%s (tier %d)",
- databaseName, dataRegionId, filePartitionId, tierLevel),
- e);
- }
+ return directorySelector.selectDirectory(sourceDirectory, FileName,
tierLevel);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
index d18be675f25..9956148b083 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.storageengine.load.disk;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.metrics.utils.FileStoreUtils;
@@ -32,6 +29,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.nio.file.FileStore;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -40,15 +38,20 @@ import java.util.Optional;
public class MinIOSelector extends InheritSystemMultiDisksStrategySelector {
private static final Logger logger =
LoggerFactory.getLogger(MinIOSelector.class);
- private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+
private final Map<String, String> rootDisks2DataDirsMapForLoad;
- public MinIOSelector() {
+ public MinIOSelector(final String[] dirs, final DiskDirectorySelector
selector) {
+ super(selector);
+ if (dirs == null || dirs.length == 0) {
+ rootDisks2DataDirsMapForLoad = Collections.emptyMap();
+ logger.warn("MinIO selector requires at least one directory");
+ return;
+ }
// init data dirs' root disks
- this.rootDisks2DataDirsMapForLoad = new
HashMap<>(config.getTierDataDirs()[0].length);
- Arrays.stream(config.getTierDataDirs()[0])
+ this.rootDisks2DataDirsMapForLoad = new HashMap<>(dirs.length);
+ Arrays.stream(dirs)
.filter(Objects::nonNull)
- .map(v -> fsFactory.getFile(v,
IoTDBConstant.UNSEQUENCE_FOLDER_NAME).getPath())
.forEach(
dataDirPath -> {
File dataDirFile = new File(dataDirPath);
@@ -70,44 +73,39 @@ public class MinIOSelector extends
InheritSystemMultiDisksStrategySelector {
}
@Override
- public File getTargetFile(
- File fileToLoad,
- String databaseName,
- String dataRegionId,
- long filePartitionId,
- String tsfileName,
- int tierLevel)
+ public File selectTargetDirectory(
+ final File sourceDirectory,
+ final String fileName,
+ final boolean appendFileName,
+ final int tierLevel)
throws DiskSpaceInsufficientException, LoadFileException {
- File targetFile;
String fileDirRoot = null;
try {
fileDirRoot =
-
Optional.ofNullable(FileStoreUtils.getFileStore(fileToLoad.getCanonicalPath()))
+
Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath()))
.map(Object::toString)
.orElse(null);
} catch (Exception e) {
- logger.warn("Exception occurs when reading target file's mount point
{}", filePartitionId, e);
+ logger.warn(
+ "Exception occurs when reading target file's mount point {}",
+ sourceDirectory.getAbsoluteFile(),
+ e);
}
+ File targetFile = null;
if (rootDisks2DataDirsMapForLoad.containsKey(fileDirRoot)) {
- // if there is an overlap between firDirRoot and data directories' disk
roots, try to get
- // targetFile in the same disk
- targetFile =
- fsFactory.getFile(
- rootDisks2DataDirsMapForLoad.get(fileDirRoot),
- databaseName
- + File.separatorChar
- + dataRegionId
- + File.separatorChar
- + filePartitionId
- + File.separator
- + tsfileName);
+ if (appendFileName) {
+ // if there is an overlap between firDirRoot and data directories'
disk roots, try to get
+ // targetFile in the same disk
+ targetFile =
fsFactory.getFile(rootDisks2DataDirsMapForLoad.get(fileDirRoot), fileName);
+ } else {
+ targetFile = new File(rootDisks2DataDirsMapForLoad.get(fileDirRoot));
+ }
return targetFile;
}
// if there isn't an overlap, downgrade to storage balance(sequence)
strategy.
- return super.getTargetFile(
- fileToLoad, databaseName, dataRegionId, filePartitionId, tsfileName,
tierLevel);
+ return super.selectTargetDirectory(sourceDirectory, fileName,
appendFileName, tierLevel);
}
}