This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 432e0273261 Pipe/Load: Implement multi-disk awareness of multiple file 
systems during file copying and moving (#15356)
432e0273261 is described below

commit 432e0273261d3fc5b42cf660cfc219457bc3377c
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Jul 28 17:51:53 2025 +0800

    Pipe/Load: Implement multi-disk awareness of multiple file systems during 
file copying and moving (#15356)
    
    * Pipe/Load: Implement multi-disk awareness of multiple file systems during 
file copying and moving
    
    * spotless
    
    * fix
    
    * fix
    
    * update ActiveLoadUtil
    
    * update InheritSystemMultiDisksStrategySelector
    
    * update ILoadDisksSelector
    
    * update ActiveLoadUtil
    
    * fix
    
    * fix
    
    * modify the code based on review comments
    
    * modify the code based on review comments
    
    * spotless
    
    * fix
    
    * fix
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   6 +-
 .../protocol/thrift/IoTDBDataNodeReceiver.java     |  32 +---
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  88 ++--------
 .../db/storageengine/dataregion/DataRegion.java    |  72 ++++----
 .../load/active/ActiveLoadDirScanner.java          |   1 +
 .../storageengine/load/active/ActiveLoadUtil.java  | 183 +++++++++++++++++++++
 .../storageengine/load/disk/ILoadDiskSelector.java |  36 +++-
 .../InheritSystemMultiDisksStrategySelector.java   |  50 ++----
 .../db/storageengine/load/disk/MinIOSelector.java  |  62 ++++---
 9 files changed, 311 insertions(+), 219 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index b751c150585..30c343ba358 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -3912,7 +3912,11 @@ public class IoTDBConfig {
   }
 
   public String getLoadDiskSelectStrategyForIoTV2AndPipe() {
-    return loadDiskSelectStrategyForIoTV2AndPipe;
+    return LoadDiskSelectorType.INHERIT_LOAD
+            .getValue()
+            .equals(loadDiskSelectStrategyForIoTV2AndPipe)
+        ? getLoadDiskSelectStrategy()
+        : loadDiskSelectStrategyForIoTV2AndPipe;
   }
 
   public void setLoadDiskSelectStrategyForIoTV2AndPipe(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
index c899eca9435..9220d236c82 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java
@@ -37,8 +37,6 @@ import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFil
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferFileSealReqV2;
 import 
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeTransferSliceReq;
 import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
-import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
@@ -95,6 +93,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
@@ -110,7 +109,6 @@ import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.Paths;
@@ -567,35 +565,9 @@ public class IoTDBDataNodeReceiver extends 
IoTDBFileReceiver {
 
   private TSStatus loadTsFileAsync(final String dataBaseName, final 
List<String> absolutePaths)
       throws IOException {
-    final String loadActiveListeningPipeDir = 
IOTDB_CONFIG.getLoadActiveListeningPipeDir();
-    if (Objects.isNull(loadActiveListeningPipeDir)) {
+    if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths, 
true)) {
       throw new PipeException("Load active listening pipe dir is not set.");
     }
-
-    if (Objects.nonNull(dataBaseName)) {
-      final File targetDir = new File(loadActiveListeningPipeDir, 
dataBaseName);
-      return this.loadTsFileAsyncToTargetDir(targetDir, absolutePaths);
-    }
-
-    return loadTsFileAsyncToTargetDir(new File(loadActiveListeningPipeDir), 
absolutePaths);
-  }
-
-  private TSStatus loadTsFileAsyncToTargetDir(
-      final File targetDir, final List<String> absolutePaths) throws 
IOException {
-    for (final String absolutePath : absolutePaths) {
-      if (absolutePath == null) {
-        continue;
-      }
-      final File sourceFile = new File(absolutePath);
-      if (!Objects.equals(
-          targetDir.getAbsolutePath(), 
sourceFile.getParentFile().getAbsolutePath())) {
-        RetryUtils.retryOnException(
-            () -> {
-              FileUtils.moveFileWithMD5Check(sourceFile, targetDir);
-              return null;
-            });
-      }
-    }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 8ec6bd7bb48..0f90d7c8724 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.queryengine.plan.analyze.load;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.auth.AuthException;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.utils.RetryUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
 import org.apache.iotdb.db.exception.load.LoadAnalyzeTypeMismatchException;
@@ -45,6 +44,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
+import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
 import 
org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
 import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
 import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
@@ -75,8 +75,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 
-import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
-import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
 import static 
org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
 import static 
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
 import static 
org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS_ASYNC_MOVE;
@@ -276,83 +274,29 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   private boolean doAsyncLoad(final IAnalysis analysis) {
     final long startTime = System.nanoTime();
     try {
-      final String[] loadActiveListeningDirs =
-          
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
-      String targetFilePath = null;
-      for (int i = 0, size = loadActiveListeningDirs == null ? 0 : 
loadActiveListeningDirs.length;
-          i < size;
-          i++) {
-        if (loadActiveListeningDirs[i] != null) {
-          targetFilePath = loadActiveListeningDirs[i];
-          break;
-        }
-      }
-      if (targetFilePath == null) {
-        LOGGER.warn("Load active listening dir is not set. Will try sync load 
instead.");
-        return false;
+      final String databaseName;
+      if (Objects.nonNull(databaseForTableData)
+          || (Objects.nonNull(context) && 
context.getDatabaseName().isPresent())) {
+        databaseName =
+            Objects.nonNull(databaseForTableData)
+                ? databaseForTableData
+                : context.getDatabaseName().get();
+      } else {
+        databaseName = null;
       }
-
-      try {
-        if (Objects.nonNull(databaseForTableData)
-            || (Objects.nonNull(context) && 
context.getDatabaseName().isPresent())) {
-          loadTsFilesAsyncToTargetDir(
-              new File(
-                  targetFilePath,
-                  databaseForTableData =
-                      Objects.nonNull(databaseForTableData)
-                          ? databaseForTableData
-                          : context.getDatabaseName().get()),
-              tsFiles);
-        } else {
-          loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
-        }
-      } catch (Exception e) {
-        LOGGER.warn(
-            "Failed to async load tsfiles {} to target dir {}. Will try sync 
load instead.",
-            tsFiles,
-            targetFilePath,
-            e);
-        return false;
+      if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, databaseName, 
isDeleteAfterLoad)) {
+        analysis.setFinishQueryAfterAnalyze(true);
+        setRealStatement(analysis);
+        return true;
       }
-
-      analysis.setFinishQueryAfterAnalyze(true);
-      setRealStatement(analysis);
-      return true;
+      LOGGER.info("Async Load has failed, and is now trying to load sync");
+      return false;
     } finally {
       LoadTsFileCostMetricsSet.getInstance()
           .recordPhaseTimeCost(ANALYSIS_ASYNC_MOVE, System.nanoTime() - 
startTime);
     }
   }
 
-  private void loadTsFilesAsyncToTargetDir(final File targetDir, final 
List<File> files)
-      throws IOException {
-    for (final File file : files) {
-      if (file == null) {
-        continue;
-      }
-
-      loadTsFileAsyncToTargetDir(targetDir, file);
-      loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + 
".resource"));
-      loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + 
".mods"));
-    }
-  }
-
-  private void loadTsFileAsyncToTargetDir(final File targetDir, final File 
file)
-      throws IOException {
-    if (!file.exists()) {
-      return;
-    }
-    RetryUtils.retryOnException(
-        () -> {
-          if (isDeleteAfterLoad) {
-            moveFileWithMD5Check(file, targetDir);
-          } else {
-            copyFileWithMD5Check(file, targetDir);
-          }
-          return null;
-        });
-  }
-
   private boolean doAnalyzeFileByFile(IAnalysis analysis) {
     // analyze tsfile metadata file by file
     for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index fdfcd18c64a..d0cd62deacc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -24,6 +24,7 @@ import 
org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.file.SystemFileFactory;
@@ -137,8 +138,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
 import 
org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
 import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
-import 
org.apache.iotdb.db.storageengine.load.disk.InheritSystemMultiDisksStrategySelector;
-import org.apache.iotdb.db.storageengine.load.disk.MinIOSelector;
 import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
 import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
@@ -430,27 +429,30 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   private void initDiskSelector() {
-    switch 
(ILoadDiskSelector.LoadDiskSelectorType.fromValue(config.getLoadDiskSelectStrategy()))
 {
-      case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
-        ordinaryLoadDiskSelector = new 
InheritSystemMultiDisksStrategySelector();
-        break;
-      case MIN_IO_FIRST:
-      default:
-        ordinaryLoadDiskSelector = new MinIOSelector();
-    }
+    final ILoadDiskSelector.DiskDirectorySelector selector =
+        (sourceDirectory, fileName, tierLevel) -> {
+          try {
+            return TierManager.getInstance()
+                .getFolderManager(tierLevel, false)
+                .getNextWithRetry(folder -> fsFactory.getFile(folder, 
fileName));
+          } catch (DiskSpaceInsufficientException e) {
+            throw e;
+          } catch (Exception e) {
+            throw new LoadFileException(
+                String.format("Storage allocation failed for %s (tier %d)", 
fileName, tierLevel),
+                e);
+          }
+        };
 
-    switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(
-        config.getLoadDiskSelectStrategyForIoTV2AndPipe())) {
-      case MIN_IO_FIRST:
-        pipeAndIoTV2LoadDiskSelector = new MinIOSelector();
-        break;
-      case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
-        pipeAndIoTV2LoadDiskSelector = new 
InheritSystemMultiDisksStrategySelector();
-        break;
-      case INHERIT_LOAD:
-      default:
-        pipeAndIoTV2LoadDiskSelector = ordinaryLoadDiskSelector;
-    }
+    final String[] dirs =
+        Arrays.stream(config.getTierDataDirs()[0])
+            .map(v -> fsFactory.getFile(v, 
IoTDBConstant.UNSEQUENCE_FOLDER_NAME).getPath())
+            .toArray(String[]::new);
+    ordinaryLoadDiskSelector =
+        ILoadDiskSelector.initDiskSelector(config.getLoadDiskSelectStrategy(), 
dirs, selector);
+    pipeAndIoTV2LoadDiskSelector =
+        ILoadDiskSelector.initDiskSelector(
+            config.getLoadDiskSelectStrategyForIoTV2AndPipe(), dirs, selector);
   }
 
   @Override
@@ -3274,22 +3276,20 @@ public class DataRegion implements IDataRegionForQuery {
       boolean isGeneratedByPipe)
       throws LoadFileException, DiskSpaceInsufficientException {
     final int targetTierLevel = 0;
+    final String fileName =
+        databaseName
+            + File.separatorChar
+            + dataRegionId
+            + File.separatorChar
+            + filePartitionId
+            + File.separator
+            + tsFileResource.getTsFile().getName();
     final File targetFile =
         (tsFileResource.isGeneratedByPipeConsensus() || 
tsFileResource.isGeneratedByPipe())
-            ? pipeAndIoTV2LoadDiskSelector.getTargetFile(
-                tsFileToLoad,
-                databaseName,
-                dataRegionId,
-                filePartitionId,
-                tsFileResource.getTsFile().getName(),
-                targetTierLevel)
-            : ordinaryLoadDiskSelector.getTargetFile(
-                tsFileToLoad,
-                databaseName,
-                dataRegionId,
-                filePartitionId,
-                tsFileResource.getTsFile().getName(),
-                targetTierLevel);
+            ? pipeAndIoTV2LoadDiskSelector.selectTargetDirectory(
+                tsFileToLoad.getParentFile(), fileName, true, targetTierLevel)
+            : ordinaryLoadDiskSelector.selectTargetDirectory(
+                tsFileToLoad.getParentFile(), fileName, true, targetTierLevel);
 
     tsFileResource.setFile(targetFile);
     if (tsFileManager.contains(tsFileResource, false)) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
index b720b2be6b0..27b36c20793 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java
@@ -188,6 +188,7 @@ public class ActiveLoadDirScanner extends 
ActiveLoadScheduledExecutorService {
 
               
listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs());
               
listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs()));
+              ActiveLoadUtil.updateLoadDiskSelector();
             }
           }
         }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
new file mode 100644
index 00000000000..5bfa9b71105
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.active;
+
+import org.apache.iotdb.commons.utils.RetryUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
+import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
+import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+
+import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
+import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
+
+public class ActiveLoadUtil {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ActiveLoadUtil.class);
+
+  private static volatile ILoadDiskSelector loadDiskSelector = 
updateLoadDiskSelector();
+
+  public static boolean loadTsFileAsyncToActiveDir(
+      final List<File> tsFiles, final String dataBaseName, final boolean 
isDeleteAfterLoad) {
+    if (tsFiles == null || tsFiles.isEmpty()) {
+      return true;
+    }
+
+    try {
+      for (File file : tsFiles) {
+        if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) {
+          return false;
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Fail to load tsfile to Active dir", e);
+      return false;
+    }
+
+    return true;
+  }
+
+  private static boolean loadTsFilesToActiveDir(
+      final String dataBaseName, final File file, final boolean 
isDeleteAfterLoad)
+      throws IOException {
+    if (file == null) {
+      return true;
+    }
+
+    final File targetFilePath;
+    try {
+      targetFilePath =
+          loadDiskSelector.selectTargetDirectory(file.getParentFile(), 
file.getName(), false, 0);
+    } catch (Exception e) {
+      LOGGER.warn("Fail to load disk space of file {}", 
file.getAbsolutePath(), e);
+      return false;
+    }
+
+    if (targetFilePath == null) {
+      LOGGER.warn("Load active listening dir is not set.");
+      return false;
+    }
+    final File targetDir;
+    if (Objects.nonNull(dataBaseName)) {
+      targetDir = new File(targetFilePath, dataBaseName);
+    } else {
+      targetDir = targetFilePath;
+    }
+
+    loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad);
+    loadTsFileAsyncToTargetDir(
+        targetDir, new File(file.getAbsolutePath() + ".resource"), 
isDeleteAfterLoad);
+    loadTsFileAsyncToTargetDir(
+        targetDir, new File(file.getAbsolutePath() + ".mods"), 
isDeleteAfterLoad);
+    return true;
+  }
+
+  public static boolean loadFilesToActiveDir(
+      final String dataBaseName, final List<String> files, final boolean 
isDeleteAfterLoad)
+      throws IOException {
+    if (files == null || files.isEmpty()) {
+      return true;
+    }
+
+    final File targetFilePath;
+    try {
+      final File file = new File(files.get(0));
+      targetFilePath =
+          loadDiskSelector.selectTargetDirectory(file.getParentFile(), 
file.getName(), false, 0);
+    } catch (Exception e) {
+      LOGGER.warn("Fail to load disk space of file {}", files.get(0), e);
+      return false;
+    }
+
+    if (targetFilePath == null) {
+      LOGGER.warn("Load active listening dir is not set.");
+      return false;
+    }
+    final File targetDir;
+    if (Objects.nonNull(dataBaseName)) {
+      targetDir = new File(targetFilePath, dataBaseName);
+    } else {
+      targetDir = targetFilePath;
+    }
+
+    for (final String file : files) {
+      loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad);
+    }
+    return true;
+  }
+
+  private static void loadTsFileAsyncToTargetDir(
+      final File targetDir, final File file, final boolean isDeleteAfterLoad) 
throws IOException {
+    if (!file.exists()) {
+      return;
+    }
+    RetryUtils.retryOnException(
+        () -> {
+          if (isDeleteAfterLoad) {
+            moveFileWithMD5Check(file, targetDir);
+          } else {
+            copyFileWithMD5Check(file, targetDir);
+          }
+          return null;
+        });
+  }
+
+  public static ILoadDiskSelector updateLoadDiskSelector() {
+    final String[] dirs = 
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
+    FolderManager folderManager = null;
+    DiskSpaceInsufficientException exception = null;
+
+    try {
+      folderManager =
+          new FolderManager(Arrays.asList(dirs), 
DirectoryStrategyType.SEQUENCE_STRATEGY);
+    } catch (DiskSpaceInsufficientException e) {
+      // It should be noted that if this exception is not ignored, the entire 
process may fail to
+      // start.
+      exception = e;
+      LOGGER.warn("Failed to load active listening dirs", e);
+    }
+
+    final FolderManager finalFolderManager = folderManager;
+    final DiskSpaceInsufficientException finalException = exception;
+    ILoadDiskSelector loadDiskSelector =
+        ILoadDiskSelector.initDiskSelector(
+            
IoTDBDescriptor.getInstance().getConfig().getLoadDiskSelectStrategy(),
+            dirs,
+            (sourceDir, fileName, tierLevel) -> {
+              if (finalException != null) {
+                throw finalException;
+              }
+              return new File(finalFolderManager.getNextFolder());
+            });
+
+    ActiveLoadUtil.loadDiskSelector = loadDiskSelector;
+    return loadDiskSelector;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
index 269228e9fe8..e9216e9b555 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java
@@ -26,15 +26,33 @@ import java.io.File;
 
 public interface ILoadDiskSelector {
 
-  File getTargetFile(
-      File fileToLoad,
-      String databaseName,
-      String dataRegionId,
-      long filePartitionId,
-      String tsfileName,
-      int tierLevel)
+  @FunctionalInterface
+  interface DiskDirectorySelector {
+    File selectDirectory(final File sourceDirectory, final String fileName, 
final int tierLevel)
+        throws DiskSpaceInsufficientException, LoadFileException;
+  }
+
+  File selectTargetDirectory(
+      final File sourceDirectory,
+      final String fileName,
+      final boolean appendFileName,
+      final int tierLevel)
       throws DiskSpaceInsufficientException, LoadFileException;
 
+  static ILoadDiskSelector initDiskSelector(
+      final String selectStrategy, final String[] dirs, final 
DiskDirectorySelector selector) {
+    final ILoadDiskSelector diskSelector;
+    switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(selectStrategy)) {
+      case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
+        diskSelector = new InheritSystemMultiDisksStrategySelector(selector);
+        break;
+      case MIN_IO_FIRST:
+      default:
+        diskSelector = new MinIOSelector(dirs, selector);
+    }
+    return diskSelector;
+  }
+
   enum LoadDiskSelectorType {
     MIN_IO_FIRST("MIN_IO_FIRST"),
     
INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY("INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY"),
@@ -44,7 +62,7 @@ public interface ILoadDiskSelector {
 
     private final String value;
 
-    LoadDiskSelectorType(String value) {
+    LoadDiskSelectorType(final String value) {
       this.value = value;
     }
 
@@ -52,7 +70,7 @@ public interface ILoadDiskSelector {
       return value;
     }
 
-    public static LoadDiskSelectorType fromValue(String value) {
+    public static LoadDiskSelectorType fromValue(final String value) {
       if (value.equalsIgnoreCase(MIN_IO_FIRST.getValue())) {
         return MIN_IO_FIRST;
       } else if 
(value.equalsIgnoreCase(INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY.getValue())) 
{
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
index 7739c42a538..c56addc4bf9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java
@@ -21,56 +21,28 @@ package org.apache.iotdb.db.storageengine.load.disk;
 
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.load.LoadFileException;
-import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
 
 import org.apache.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 
 public class InheritSystemMultiDisksStrategySelector implements 
ILoadDiskSelector {
 
-  protected final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
-  private static final Logger logger =
-      LoggerFactory.getLogger(InheritSystemMultiDisksStrategySelector.class);
+  protected static final FSFactory fsFactory = 
FSFactoryProducer.getFSFactory();
 
-  public InheritSystemMultiDisksStrategySelector() {
-    // empty body
+  protected final DiskDirectorySelector directorySelector;
+
+  public InheritSystemMultiDisksStrategySelector(final DiskDirectorySelector 
selector) {
+    this.directorySelector = selector;
   }
 
-  public File getTargetFile(
-      File fileToLoad,
-      String databaseName,
-      String dataRegionId,
-      long filePartitionId,
-      String tsfileName,
-      int tierLevel)
+  public File selectTargetDirectory(
+      final File sourceDirectory,
+      final String FileName,
+      final boolean appendFileName,
+      final int tierLevel)
       throws DiskSpaceInsufficientException, LoadFileException {
-    try {
-      return TierManager.getInstance()
-          .getFolderManager(tierLevel, false)
-          .getNextWithRetry(
-              folder -> {
-                return fsFactory.getFile(
-                    folder,
-                    databaseName
-                        + File.separatorChar
-                        + dataRegionId
-                        + File.separatorChar
-                        + filePartitionId
-                        + File.separator
-                        + tsfileName);
-              });
-    } catch (DiskSpaceInsufficientException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new LoadFileException(
-          String.format(
-              "Storage allocation failed for %s/%s/%s (tier %d)",
-              databaseName, dataRegionId, filePartitionId, tierLevel),
-          e);
-    }
+    return directorySelector.selectDirectory(sourceDirectory, FileName, 
tierLevel);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
index d18be675f25..9956148b083 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java
@@ -19,9 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.load.disk;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.metrics.utils.FileStoreUtils;
@@ -32,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.nio.file.FileStore;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -40,15 +38,20 @@ import java.util.Optional;
 public class MinIOSelector extends InheritSystemMultiDisksStrategySelector {
 
   private static final Logger logger = 
LoggerFactory.getLogger(MinIOSelector.class);
-  private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
   private final Map<String, String> rootDisks2DataDirsMapForLoad;
 
-  public MinIOSelector() {
+  public MinIOSelector(final String[] dirs, final DiskDirectorySelector 
selector) {
+    super(selector);
+    if (dirs == null || dirs.length == 0) {
+      rootDisks2DataDirsMapForLoad = Collections.emptyMap();
+      logger.warn("MinIO selector requires at least one directory");
+      return;
+    }
     // init data dirs' root disks
-    this.rootDisks2DataDirsMapForLoad = new 
HashMap<>(config.getTierDataDirs()[0].length);
-    Arrays.stream(config.getTierDataDirs()[0])
+    this.rootDisks2DataDirsMapForLoad = new HashMap<>(dirs.length);
+    Arrays.stream(dirs)
         .filter(Objects::nonNull)
-        .map(v -> fsFactory.getFile(v, 
IoTDBConstant.UNSEQUENCE_FOLDER_NAME).getPath())
         .forEach(
             dataDirPath -> {
               File dataDirFile = new File(dataDirPath);
@@ -70,44 +73,39 @@ public class MinIOSelector extends 
InheritSystemMultiDisksStrategySelector {
   }
 
   @Override
-  public File getTargetFile(
-      File fileToLoad,
-      String databaseName,
-      String dataRegionId,
-      long filePartitionId,
-      String tsfileName,
-      int tierLevel)
+  public File selectTargetDirectory(
+      final File sourceDirectory,
+      final String fileName,
+      final boolean appendFileName,
+      final int tierLevel)
       throws DiskSpaceInsufficientException, LoadFileException {
-    File targetFile;
     String fileDirRoot = null;
     try {
       fileDirRoot =
-          
Optional.ofNullable(FileStoreUtils.getFileStore(fileToLoad.getCanonicalPath()))
+          
Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath()))
               .map(Object::toString)
               .orElse(null);
     } catch (Exception e) {
-      logger.warn("Exception occurs when reading target file's mount point 
{}", filePartitionId, e);
+      logger.warn(
+          "Exception occurs when reading target file's mount point {}",
+          sourceDirectory.getAbsoluteFile(),
+          e);
     }
 
+    File targetFile = null;
     if (rootDisks2DataDirsMapForLoad.containsKey(fileDirRoot)) {
-      // if there is an overlap between firDirRoot and data directories' disk 
roots, try to get
-      // targetFile in the same disk
-      targetFile =
-          fsFactory.getFile(
-              rootDisks2DataDirsMapForLoad.get(fileDirRoot),
-              databaseName
-                  + File.separatorChar
-                  + dataRegionId
-                  + File.separatorChar
-                  + filePartitionId
-                  + File.separator
-                  + tsfileName);
+      if (appendFileName) {
+        // if there is an overlap between firDirRoot and data directories' 
disk roots, try to get
+        // targetFile in the same disk
+        targetFile = 
fsFactory.getFile(rootDisks2DataDirsMapForLoad.get(fileDirRoot), fileName);
+      } else {
+        targetFile = new File(rootDisks2DataDirsMapForLoad.get(fileDirRoot));
+      }
 
       return targetFile;
     }
 
     // if there isn't an overlap, downgrade to storage balance(sequence) 
strategy.
-    return super.getTargetFile(
-        fileToLoad, databaseName, dataRegionId, filePartitionId, tsfileName, 
tierLevel);
+    return super.selectTargetDirectory(sourceDirectory, fileName, 
appendFileName, tierLevel);
   }
 }


Reply via email to