This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 76a5f5d848c Pipe: decouple pipe receiver directory from the system 
directory and add support for multiple folders (#11333)
76a5f5d848c is described below

commit 76a5f5d848c2f4a4262f210a880ab3c5e19463c5
Author: V_Galaxy <[email protected]>
AuthorDate: Wed Oct 18 11:06:55 2023 +0800

    Pipe: decouple pipe receiver directory from the system directory and add 
support for multiple folders (#11333)
    
    **NOTE:** In https://github.com/apache/iotdb/pull/11318 and THIS PR, it is 
assumed that `systemDir` will be initialized with the user-specified value 
**before** the call to `getPipeReceiverFileDir(s)`.
---
 .../resources/conf/iotdb-datanode.properties       | 13 ++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 18 +++++-----
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  8 +++--
 .../db/pipe/agent/receiver/PipeReceiverAgent.java  | 14 +++++---
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |  2 +-
 .../receiver/thrift/IoTDBThriftReceiverV1.java     | 39 ++++++++++++++++++++--
 6 files changed, 77 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties 
b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
index 2170c6ec004..c8aef63aa1b 100644
--- a/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
+++ b/iotdb-core/datanode/src/assembly/resources/conf/iotdb-datanode.properties
@@ -223,6 +223,19 @@ dn_target_config_node_list=127.0.0.1:10710
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # sort_tmp_dir=data/datanode/tmp
 
+# pipe_receiver_file_dirs
+# If this property is unset, system will save the data in the default relative 
path directory under the IoTDB folder(i.e., 
%IOTDB_HOME%/${dn_system_dir}/pipe/receiver).
+# If it is absolute, system will save the data in the exact location it points 
to.
+# If it is relative, system will save the data in the relative path directory 
it indicates under the IoTDB folder.
+# If there are more than one directory, please separate them by commas ",".
+# Note: If pipe_receiver_file_dirs is assigned an empty 
string(i.e.,zero-size), it will be handled as a relative path.
+# For windows platform
+# If its prefix is a drive specifier followed by "\\", or if its prefix is 
"\\\\", then the path is absolute. Otherwise, it is relative.
+# pipe_receiver_file_dirs=data\\datanode\\system\\pipe\\receiver
+# For Linux platform
+# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
+# pipe_receiver_file_dirs=data/datanode/system/pipe/receiver
+
 ####################
 ### Metric Configuration
 ####################
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e5594f85f81..9c06c97e7d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1081,7 +1081,7 @@ public class IoTDBConfig {
 
   /** Pipe related */
   /** initialized as null, updated based on the latest `systemDir` during 
querying */
-  private String pipeReceiverFileDir = null;
+  private String[] pipeReceiverFileDirs = null;
 
   /** Resource control */
   private boolean quotaEnable = false;
@@ -1214,7 +1214,9 @@ public class IoTDBConfig {
     triggerTemporaryLibDir = addDataHomeDir(triggerTemporaryLibDir);
     pipeDir = addDataHomeDir(pipeDir);
     pipeTemporaryLibDir = addDataHomeDir(pipeTemporaryLibDir);
-    pipeReceiverFileDir = addDataHomeDir(pipeReceiverFileDir);
+    for (int i = 0; i < pipeReceiverFileDirs.length; i++) {
+      pipeReceiverFileDirs[i] = addDataHomeDir(pipeReceiverFileDirs[i]);
+    }
     mqttDir = addDataHomeDir(mqttDir);
     extPipeDir = addDataHomeDir(extPipeDir);
     queryDir = addDataHomeDir(queryDir);
@@ -3708,14 +3710,14 @@ public class IoTDBConfig {
     return modeMapSizeThreshold;
   }
 
-  public void setPipeReceiverFileDir(String pipeReceiverFileDir) {
-    this.pipeReceiverFileDir = pipeReceiverFileDir;
+  public void setPipeReceiverFileDirs(String[] pipeReceiverFileDirs) {
+    this.pipeReceiverFileDirs = pipeReceiverFileDirs;
   }
 
-  public String getPipeReceiverFileDir() {
-    return Objects.isNull(this.pipeReceiverFileDir)
-        ? (systemDir + File.separator + "pipe" + File.separator + "receiver")
-        : this.pipeReceiverFileDir;
+  public String[] getPipeReceiverFileDirs() {
+    return Objects.isNull(this.pipeReceiverFileDirs)
+        ? new String[] {systemDir + File.separator + "pipe" + File.separator + 
"receiver"}
+        : this.pipeReceiverFileDirs;
   }
 
   public boolean isQuotaEnable() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 7850bf6b103..2e94b53f2bb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1972,8 +1972,12 @@ public class IoTDBDescriptor {
   private void loadPipeProps(Properties properties) {
     conf.setPipeLibDir(properties.getProperty("pipe_lib_dir", 
conf.getPipeLibDir()));
 
-    conf.setPipeReceiverFileDir(
-        properties.getProperty("pipe_receiver_file_dir", 
conf.getPipeReceiverFileDir()));
+    conf.setPipeReceiverFileDirs(
+        properties
+            .getProperty(
+                "pipe_receiver_file_dirs", String.join(",", 
conf.getPipeReceiverFileDirs()))
+            .trim()
+            .split(","));
   }
 
   private void loadCQProps(Properties properties) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
index 53967e17757..b2af4f2258f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeReceiverAgent.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 
 /** PipeReceiverAgent is the entry point of all pipe receivers' logic. */
 public class PipeReceiverAgent {
@@ -58,10 +59,7 @@ public class PipeReceiverAgent {
     return legacyAgent;
   }
 
-  public void cleanPipeReceiverDir() {
-    final File receiverFileDir =
-        new 
File(IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDir());
-
+  private static void cleanPipeReceiverDir(File receiverFileDir) {
     try {
       FileUtils.deleteDirectory(receiverFileDir);
       LOGGER.info("Clean pipe receiver dir {} successfully.", receiverFileDir);
@@ -76,4 +74,12 @@ public class PipeReceiverAgent {
       LOGGER.warn("Create pipe receiver dir {} failed.", receiverFileDir, e);
     }
   }
+
+  public void cleanPipeReceiverDirs() {
+    String[] pipeReceiverFileDirs =
+        IoTDBDescriptor.getInstance().getConfig().getPipeReceiverFileDirs();
+    Arrays.stream(pipeReceiverFileDirs)
+        .map(File::new)
+        .forEach(PipeReceiverAgent::cleanPipeReceiverDir);
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 37f56b7835f..dc50ba94bee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -59,7 +59,7 @@ public class PipeRuntimeAgent implements IService {
     PipeHardlinkFileDirStartupCleaner.clean();
 
     // clean receiver file dir
-    PipeAgent.receiver().cleanPipeReceiverDir();
+    PipeAgent.receiver().cleanPipeReceiverDirs();
 
     PipeAgentLauncher.launchPipePluginAgent(resourcesInformationHolder);
     simpleConsensusProgressIndexAssigner.start();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
index a3039214073..6d8253e0a40 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java
@@ -21,9 +21,11 @@ package org.apache.iotdb.db.pipe.receiver.thrift;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.db.auth.AuthorityChecker;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import 
org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest;
 import org.apache.iotdb.db.pipe.connector.payload.evolvable.PipeRequestType;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp;
@@ -49,6 +51,8 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement
 import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedInsertBaseStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.PipeEnrichedLoadTsFileStatement;
+import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
+import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -63,6 +67,8 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.time.ZoneId;
+import java.util.Arrays;
+import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -73,7 +79,21 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
 
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
-  private static final String RECEIVER_FILE_BASE_DIR = 
IOTDB_CONFIG.getPipeReceiverFileDir();
+  private static final String[] RECEIVER_FILE_BASE_DIRS = 
IOTDB_CONFIG.getPipeReceiverFileDirs();
+  private static FolderManager folderManager = null;
+
+  static {
+    try {
+      folderManager =
+          new FolderManager(
+              Arrays.asList(RECEIVER_FILE_BASE_DIRS), 
DirectoryStrategyType.SEQUENCE_STRATEGY);
+    } catch (DiskSpaceInsufficientException e) {
+      LOGGER.error(
+          "Fail to create pipe receiver file folders allocation strategy 
because all disks of folders are full.",
+          e);
+    }
+  }
+
   private final AtomicReference<File> receiverFileDirWithIdSuffix = new 
AtomicReference<>();
 
   // Used to generate transfer id, which is used to identify a receiver thread.
@@ -182,8 +202,23 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
       LOGGER.info("Current receiver file dir is null. No need to delete.");
     }
 
+    // get next receiver file base dir by folder manager
+    if (Objects.isNull(folderManager)) {
+      LOGGER.error(
+          "Failed to init pipe receiver file folder manager because all disks 
of folders are full.");
+      return new 
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
+    }
+    String receiverFileBaseDir;
+    try {
+      receiverFileBaseDir = folderManager.getNextFolder();
+    } catch (DiskSpaceInsufficientException e) {
+      LOGGER.error(
+          "Fail to create pipe receiver file folder because all disks of 
folders are full.", e);
+      return new 
TPipeTransferResp(StatusUtils.getStatus(TSStatusCode.DISK_SPACE_INSUFFICIENT));
+    }
+
     // create a new receiver file dir
-    final File newReceiverDir = new File(RECEIVER_FILE_BASE_DIR, 
Long.toString(receiverId.get()));
+    final File newReceiverDir = new File(receiverFileBaseDir, 
Long.toString(receiverId.get()));
     if (!newReceiverDir.exists()) {
       if (newReceiverDir.mkdirs()) {
         LOGGER.info("Receiver file dir {} was created.", 
newReceiverDir.getPath());

Reply via email to