This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new fbc8828b81b [IOTDB-5921] Pipe: config & descriptor (#9957)
fbc8828b81b is described below

commit fbc8828b81b52165085ded01ea599dd455553058
Author: Itami Sho <[email protected]>
AuthorDate: Tue May 30 19:01:37 2023 +0800

    [IOTDB-5921] Pipe: config & descriptor (#9957)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../manager/load/service/HeartbeatService.java     |   8 +-
 .../manager/pipe/runtime/PipeMetaSyncer.java       |   8 +-
 .../resources/conf/iotdb-common.properties         |  54 +++++--
 .../apache/iotdb/commons/conf/CommonConfig.java    | 159 +++++++++++++++++++++
 .../iotdb/commons/conf/CommonDescriptor.java       |  87 +++++++++++
 .../iotdb/commons/pipe/config/PipeConfig.java      | 151 +++++++++++++++++++
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  16 +--
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  12 +-
 .../db/pipe/agent/runtime/PipeAgentLauncher.java   |   4 +-
 .../db/pipe/agent/runtime/PipeRuntimeAgent.java    |   2 +
 .../apache/iotdb/db/pipe/config/PipeConfig.java    |  93 ------------
 .../PipeRealtimeDataRegionHybridCollector.java     |   4 +-
 .../realtime/assigner/DisruptorQueue.java          |   5 +-
 .../matcher/CachedSchemaPatternMatcher.java        |   6 +-
 .../impl/iotdb/v1/IoTDBThriftConnectorV1.java      |   4 +-
 .../impl/iotdb/v1/IoTDBThriftReceiverV1.java       |   7 +-
 .../manager/PipeConnectorSubtaskManager.java       |   5 +-
 .../executor/PipeAssignerSubtaskExecutor.java      |   4 +-
 .../executor/PipeConnectorSubtaskExecutor.java     |   4 +-
 .../executor/PipeProcessorSubtaskExecutor.java     |   4 +-
 .../execution/scheduler/PipeSubtaskScheduler.java  |   9 +-
 .../resource/file/PipeFileResourceManager.java     |  10 +-
 .../file/PipeHardlinkFileDirStartupCleaner.java    |   5 +-
 .../task/queue/ListenableBlockingPendingQueue.java |   4 +-
 .../db/pipe/task/subtask/PipeConnectorSubtask.java |   4 +-
 25 files changed, 507 insertions(+), 162 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 38a6d8dc5f8..3e91b2b4013 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
 import 
org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
 import 
org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
@@ -124,8 +125,11 @@ public class HeartbeatService {
     // We sample DataNode's load in every 10 heartbeat loop
     heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
     
heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount());
-    // We collect pipe meta in every 100 heartbeat loop, TODO: make this 
configurable
-    heartbeatReq.setNeedPipeMetaList(heartbeatCounter.get() % 100 == 0);
+    // We collect pipe meta in every 100 heartbeat loop
+    heartbeatReq.setNeedPipeMetaList(
+        heartbeatCounter.get()
+                % 
PipeConfig.getInstance().getHeartbeatLoopCyclesForCollectingPipeMeta()
+            == 0);
     if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
       
heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
       
heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index 4d6ebb04ff0..2251d0d7892 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -40,9 +41,10 @@ public class PipeMetaSyncer {
   private static final ScheduledExecutorService SYNC_EXECUTOR =
       IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
           ThreadName.PIPE_META_SYNC_SERVICE.getName());
-  // TODO: make these configurable
-  private static final long INITIAL_SYNC_DELAY_MINUTES = 3;
-  private static final long SYNC_INTERVAL_MINUTES = 3;
+  private static final long INITIAL_SYNC_DELAY_MINUTES =
+      PipeConfig.getInstance().getPipeMetaSyncerInitialSyncDelayMinutes();
+  private static final long SYNC_INTERVAL_MINUTES =
+      PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes();
 
   private final ConfigManager configManager;
 
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties 
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index e21a6b39118..edbee060d08 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -963,7 +963,7 @@ cluster_name=defaultCluster
 # continuous_query_min_every_interval_in_ms=1000
 
 ####################
-### PIPE Configuration
+### Pipe Configuration
 ####################
 
 # Uncomment the following field to configure the pipe lib directory.
@@ -975,18 +975,52 @@ cluster_name=defaultCluster
 # If its prefix is "/", then the path is absolute. Otherwise, it is relative.
 # pipe_lib_dir=ext/pipe
 
+# The name of the directory that stores the tsfiles temporarily hold or 
generated by the pipe module.
+# The directory is located in the data directory of IoTDB.
+# pipe_hardlink_tsfile_dir_name=pipe
+
 # The maximum number of threads that can be used to execute the pipe subtasks 
in PipeSubtaskExecutor.
-# pipe_max_thread_num = 5
+# pipe_subtask_executor_max_thread_num=5
 
-# White IP list of Sync client.
-# Please use the form of IPv4 network segment to present the range of IP, for 
example: 192.168.0.0/16
-# If there are more than one IP segment, please separate them by commas
-# The default is to reject all IP to sync except 127.0.0.1
-# Datatype: String
-# ip_white_list=127.0.0.1/32
+# The number of events that need to be consumed before a checkpoint is 
triggered.
+# 
pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count=10000
+
+# The time duration (in milliseconds) between checkpoints.
+# pipe_subtask_executor_basic_check_point_interval_by_time_duration=10000
+
+# The maximum blocking time (in milliseconds) for the pending queue.
+# pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000
+
+# The default size of ring buffer in the realtime collector's disruptor queue.
+# pipe_collector_assigner_disruptor_ring_buffer_size=65536
+
+# The maximum number of entries the deviceToCollectorsCache can hold.
+# pipe_collector_matcher_cache_size=1024
+
+# The capacity for the number of tablet events that can be stored in the 
pending queue of the Hybrid Realtime Collector.
+# pipe_collector_pending_queue_capacity=128
+
+# The limit for the number of tablet events that can be held in the pending 
queue of the Hybrid Realtime Collector.
+# Noted that: this should be less than or equals to 
realtimeCollectorPendingQueueCapacity
+# pipe_collector_pending_queue_tablet_limit=64
+
+# The buffer size used for reading file during file transfer.
+# pipe_connector_read_file_buffer_size=8388608
+
+# The delay period (in milliseconds) between each retry when a connection 
failure occurs.
+# pipe_connector_retry_interval_ms=1000
+
+# The size of the pending queue for the PipeConnector to store the events.
+# pipe_connector_pending_queue_size=1024
+
+# The number of heartbeat loop cycles before collecting pipe meta once
+# pipe_heartbeat_loop_cycles_for_collecting_pipe_meta=100
+
+# The initial delay before starting the PipeMetaSyncer service.
+# pipe_meta_syncer_initial_sync_delay_minutes=3
 
-# The maximum number of retry when syncing a file to receiver fails.
-# max_number_of_sync_file_retry=5
+# The sync regular interval (in minutes)for the PipeMetaSyncer service.
+# pipe_meta_syncer_sync_interval_minutes=3
 
 ####################
 ### RatisConsensus Configuration
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a44810612f2..e5a9fae2d8d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -130,6 +130,35 @@ public class CommonConfig {
 
   private String timestampPrecision = "ms";
 
+  /** Pipe related */
+
+  /**
+   * The name of the directory that stores the tsfiles temporarily hold or 
generated by the pipe
+   * module. The directory is located in the data directory of IoTDB.
+   */
+  private String pipeHardlinkTsFileDirName = "pipe";
+
+  /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor */
+  private int pipeSubtaskExecutorMaxThreadNum = 5;
+
+  private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 
10_000;
+  private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 
1000L;
+  private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
+
+  private int pipeCollectorAssignerDisruptorRingBufferSize = 65536;
+  private int pipeCollectorMatcherCacheSize = 1024;
+  private int pipeCollectorPendingQueueCapacity = 128;
+  // this should be less than or equals to 
realtimeCollectorPendingQueueCapacity
+  private int pipeCollectorPendingQueueTabletLimit = 
pipeCollectorPendingQueueCapacity / 2;
+
+  private int pipeConnectorReadFileBufferSize = 8388608;
+  private long pipeConnectorRetryIntervalMs = 1000L;
+  private int pipeConnectorPendingQueueSize = 1024;
+
+  private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
+  private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
+  private long pipeMetaSyncerSyncIntervalMinutes = 3;
+
   CommonConfig() {}
 
   public void updatePath(String homeDir) {
@@ -390,4 +419,134 @@ public class CommonConfig {
   public String getTimestampPrecision() {
     return timestampPrecision;
   }
+
+  public String getPipeHardlinkTsFileDirName() {
+    return pipeHardlinkTsFileDirName;
+  }
+
+  public void setPipeHardlinkTsFileDirName(String pipeHardlinkTsFileDirName) {
+    this.pipeHardlinkTsFileDirName = pipeHardlinkTsFileDirName;
+  }
+
+  public int getPipeCollectorAssignerDisruptorRingBufferSize() {
+    return pipeCollectorAssignerDisruptorRingBufferSize;
+  }
+
+  public void setPipeCollectorAssignerDisruptorRingBufferSize(
+      int pipeCollectorAssignerDisruptorRingBufferSize) {
+    this.pipeCollectorAssignerDisruptorRingBufferSize =
+        pipeCollectorAssignerDisruptorRingBufferSize;
+  }
+
+  public int getPipeCollectorMatcherCacheSize() {
+    return pipeCollectorMatcherCacheSize;
+  }
+
+  public void setPipeCollectorMatcherCacheSize(int 
pipeCollectorMatcherCacheSize) {
+    this.pipeCollectorMatcherCacheSize = pipeCollectorMatcherCacheSize;
+  }
+
+  public int getPipeCollectorPendingQueueCapacity() {
+    return pipeCollectorPendingQueueCapacity;
+  }
+
+  public void setPipeCollectorPendingQueueCapacity(int 
pipeCollectorPendingQueueCapacity) {
+    this.pipeCollectorPendingQueueCapacity = pipeCollectorPendingQueueCapacity;
+  }
+
+  public int getPipeCollectorPendingQueueTabletLimit() {
+    return pipeCollectorPendingQueueTabletLimit;
+  }
+
+  public void setPipeCollectorPendingQueueTabletLimit(int 
pipeCollectorPendingQueueTabletLimit) {
+    this.pipeCollectorPendingQueueTabletLimit = 
pipeCollectorPendingQueueTabletLimit;
+  }
+
+  public int getPipeConnectorReadFileBufferSize() {
+    return pipeConnectorReadFileBufferSize;
+  }
+
+  public void setPipeConnectorReadFileBufferSize(int 
pipeConnectorReadFileBufferSize) {
+    this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
+  }
+
+  public int getPipeHeartbeatLoopCyclesForCollectingPipeMeta() {
+    return pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+  }
+
+  public void setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
+      int pipeHeartbeatLoopCyclesForCollectingPipeMeta) {
+    this.pipeHeartbeatLoopCyclesForCollectingPipeMeta =
+        pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+  }
+
+  public long getPipeMetaSyncerInitialSyncDelayMinutes() {
+    return pipeMetaSyncerInitialSyncDelayMinutes;
+  }
+
+  public void setPipeMetaSyncerInitialSyncDelayMinutes(long 
pipeMetaSyncerInitialSyncDelayMinutes) {
+    this.pipeMetaSyncerInitialSyncDelayMinutes = 
pipeMetaSyncerInitialSyncDelayMinutes;
+  }
+
+  public long getPipeMetaSyncerSyncIntervalMinutes() {
+    return pipeMetaSyncerSyncIntervalMinutes;
+  }
+
+  public void setPipeMetaSyncerSyncIntervalMinutes(long 
pipeMetaSyncerSyncIntervalMinutes) {
+    this.pipeMetaSyncerSyncIntervalMinutes = pipeMetaSyncerSyncIntervalMinutes;
+  }
+
+  public long getPipeConnectorRetryIntervalMs() {
+    return pipeConnectorRetryIntervalMs;
+  }
+
+  public void setPipeConnectorRetryIntervalMs(long 
pipeConnectorRetryIntervalMs) {
+    this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
+  }
+
+  public int getPipeConnectorPendingQueueSize() {
+    return pipeConnectorPendingQueueSize;
+  }
+
+  public void setPipeConnectorPendingQueueSize(int 
pipeConnectorPendingQueueSize) {
+    this.pipeConnectorPendingQueueSize = pipeConnectorPendingQueueSize;
+  }
+
+  public int 
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
+    return pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
+  }
+
+  public void 
setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
+      int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount) {
+    this.pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
+        pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
+  }
+
+  public long getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration() {
+    return pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration;
+  }
+
+  public void setPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration(
+      long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration) {
+    this.pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration =
+        pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration;
+  }
+
+  public int getPipeSubtaskExecutorMaxThreadNum() {
+    return pipeSubtaskExecutorMaxThreadNum;
+  }
+
+  public void setPipeSubtaskExecutorMaxThreadNum(int 
pipeSubtaskExecutorMaxThreadNum) {
+    this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum;
+  }
+
+  public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
+    return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
+  }
+
+  public void setPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(
+      long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs) {
+    this.pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs =
+        pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
+  }
 }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ef947180567..bd72a56ae88 100644
--- 
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -207,6 +207,8 @@ public class CommonDescriptor {
         properties.getProperty(
             "target_ml_node_endpoint",
             
NodeUrlUtils.convertTEndPointUrl(config.getTargetMLNodeEndPoint()));
+
+    loadPipeProps(properties);
     try {
       
config.setTargetMLNodeEndPoint(NodeUrlUtils.parseTEndPointUrl(endPointUrl));
     } catch (BadNodeUrlException e) {
@@ -216,6 +218,91 @@ public class CommonDescriptor {
     }
   }
 
+  private void loadPipeProps(Properties properties) {
+    config.setPipeHardlinkTsFileDirName(
+        properties.getProperty(
+            "pipe_hardlink_tsfile_dir_name", 
config.getPipeHardlinkTsFileDirName()));
+
+    config.setPipeSubtaskExecutorMaxThreadNum(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_subtask_executor_max_thread_num",
+                
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum()))));
+    if (config.getPipeSubtaskExecutorMaxThreadNum() <= 0) {
+      config.setPipeSubtaskExecutorMaxThreadNum(5);
+    }
+    config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
+        Integer.parseInt(
+            properties.getProperty(
+                
"pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count",
+                String.valueOf(
+                    
config.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount()))));
+    config.setPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration(
+        Integer.parseInt(
+            properties.getProperty(
+                
"pipe_subtask_executor_basic_check_point_interval_by_time_duration",
+                String.valueOf(
+                    
config.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration()))));
+    config.setPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
+                
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+
+    config.setPipeCollectorAssignerDisruptorRingBufferSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_collector_assigner_disruptor_ring_buffer_size",
+                
String.valueOf(config.getPipeCollectorAssignerDisruptorRingBufferSize()))));
+    config.setPipeCollectorMatcherCacheSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_collector_matcher_cache_size",
+                String.valueOf(config.getPipeCollectorMatcherCacheSize()))));
+    config.setPipeCollectorPendingQueueCapacity(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_collector_pending_queue_capacity",
+                
String.valueOf(config.getPipeCollectorPendingQueueCapacity()))));
+    config.setPipeCollectorPendingQueueTabletLimit(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_collector_pending_queue_tablet_limit",
+                
String.valueOf(config.getPipeCollectorPendingQueueTabletLimit()))));
+
+    config.setPipeConnectorReadFileBufferSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_connector_read_file_buffer_size",
+                String.valueOf(config.getPipeConnectorReadFileBufferSize()))));
+    config.setPipeConnectorRetryIntervalMs(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_connector_retry_interval_ms",
+                String.valueOf(config.getPipeConnectorRetryIntervalMs()))));
+    config.setPipeConnectorPendingQueueSize(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_connector_pending_queue_size",
+                String.valueOf(config.getPipeConnectorPendingQueueSize()))));
+
+    config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_heartbeat_loop_cycles_for_collecting_pipe_meta",
+                
String.valueOf(config.getPipeHeartbeatLoopCyclesForCollectingPipeMeta()))));
+    config.setPipeMetaSyncerInitialSyncDelayMinutes(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_meta_syncer_initial_sync_delay_minutes",
+                
String.valueOf(config.getPipeMetaSyncerInitialSyncDelayMinutes()))));
+    config.setPipeMetaSyncerSyncIntervalMinutes(
+        Integer.parseInt(
+            properties.getProperty(
+                "pipe_meta_syncer_sync_interval_minutes",
+                
String.valueOf(config.getPipeMetaSyncerSyncIntervalMinutes()))));
+  }
+
   public void loadGlobalConfig(TGlobalConfig globalConfig) {
     
config.setDiskSpaceWarningThreshold(globalConfig.getDiskSpaceWarningThreshold());
   }
diff --git 
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
new file mode 100644
index 00000000000..365ea0b8003
--- /dev/null
+++ 
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeConfig {
+
+  private final CommonConfig COMMON_CONFIG = 
CommonDescriptor.getInstance().getConfig();
+
+  /////////////////////////////// File ///////////////////////////////
+
+  public String getPipeHardlinkTsFileDirName() {
+    return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
+  }
+
+  /////////////////////////////// Subtask Executor 
///////////////////////////////
+
+  public int getPipeSubtaskExecutorMaxThreadNum() {
+    return COMMON_CONFIG.getPipeSubtaskExecutorMaxThreadNum();
+  }
+
+  public int 
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
+    return 
COMMON_CONFIG.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount();
+  }
+
+  public long getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration() {
+    return 
COMMON_CONFIG.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration();
+  }
+
+  public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
+    return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
+  }
+
+  /////////////////////////////// Collector ///////////////////////////////
+
+  public int getPipeCollectorAssignerDisruptorRingBufferSize() {
+    return COMMON_CONFIG.getPipeCollectorAssignerDisruptorRingBufferSize();
+  }
+
+  public int getPipeCollectorMatcherCacheSize() {
+    return COMMON_CONFIG.getPipeCollectorMatcherCacheSize();
+  }
+
+  public int getPipeCollectorPendingQueueCapacity() {
+    return COMMON_CONFIG.getPipeCollectorPendingQueueCapacity();
+  }
+
+  public int getPipeCollectorPendingQueueTabletLimit() {
+    return COMMON_CONFIG.getPipeCollectorPendingQueueTabletLimit();
+  }
+
+  /////////////////////////////// Connector ///////////////////////////////
+
+  public int getPipeConnectorReadFileBufferSize() {
+    return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+  }
+
+  public long getPipeConnectorRetryIntervalMs() {
+    return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+  }
+
+  public int getPipeConnectorPendingQueueSize() {
+    return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
+  }
+
+  /////////////////////////////// Meta Consistency 
///////////////////////////////
+
+  public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
+    return COMMON_CONFIG.getPipeHeartbeatLoopCyclesForCollectingPipeMeta();
+  }
+
+  public long getPipeMetaSyncerInitialSyncDelayMinutes() {
+    return COMMON_CONFIG.getPipeMetaSyncerInitialSyncDelayMinutes();
+  }
+
+  public long getPipeMetaSyncerSyncIntervalMinutes() {
+    return COMMON_CONFIG.getPipeMetaSyncerSyncIntervalMinutes();
+  }
+
+  /////////////////////////////// Utils ///////////////////////////////
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeConfig.class);
+
+  public void printAllConfigs() {
+    LOGGER.info("PipeHardlinkTsFileDirName: {}", 
getPipeHardlinkTsFileDirName());
+
+    LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}", 
getPipeSubtaskExecutorMaxThreadNum());
+    LOGGER.info(
+        "PipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount: {}",
+        getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount());
+    LOGGER.info(
+        "PipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration: {}",
+        getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration());
+    LOGGER.info(
+        "PipeSubtaskExecutorPendingQueueMaxBlockingTimeMs: {}",
+        getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs());
+
+    LOGGER.info(
+        "PipeCollectorAssignerDisruptorRingBufferSize: {}",
+        getPipeCollectorAssignerDisruptorRingBufferSize());
+    LOGGER.info("PipeCollectorMatcherCacheSize: {}", 
getPipeCollectorMatcherCacheSize());
+    LOGGER.info("PipeCollectorPendingQueueCapacity: {}", 
getPipeCollectorPendingQueueCapacity());
+    LOGGER.info(
+        "PipeCollectorPendingQueueTabletLimit: {}", 
getPipeCollectorPendingQueueTabletLimit());
+
+    LOGGER.info("PipeConnectorReadFileBufferSize: {}", 
getPipeConnectorReadFileBufferSize());
+    LOGGER.info("PipeConnectorRetryIntervalMs: {}", 
getPipeConnectorRetryIntervalMs());
+    LOGGER.info("PipeConnectorPendingQueueSize: {}", 
getPipeConnectorPendingQueueSize());
+
+    LOGGER.info(
+        "HeartbeatLoopCyclesForCollectingPipeMeta: {}",
+        getHeartbeatLoopCyclesForCollectingPipeMeta());
+    LOGGER.info(
+        "PipeMetaSyncerInitialSyncDelayMinutes: {}", 
getPipeMetaSyncerInitialSyncDelayMinutes());
+    LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}", 
getPipeMetaSyncerSyncIntervalMinutes());
+  }
+
+  /////////////////////////////// Singleton ///////////////////////////////
+
+  private PipeConfig() {}
+
+  public static PipeConfig getInstance() {
+    return PipeConfigHolder.INSTANCE;
+  }
+
+  private static class PipeConfigHolder {
+    private static final PipeConfig INSTANCE = new PipeConfig();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 48c3e0f853d..06fcdb54b34 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1120,8 +1120,8 @@ public class IoTDBConfig {
   private int maxPendingBatchesNum = 12;
   private double maxMemoryRatioForQueue = 0.6;
 
-  /** The maximum number of threads that can be used to execute subtasks in 
PipeSubtaskExecutor */
-  private int pipeMaxThreadNum = 5;
+  /** Pipe related */
+  private String pipeReceiveFileDir = systemDir + File.separator + "pipe";
 
   /** Resource control */
   private boolean quotaEnable = false;
@@ -1541,11 +1541,11 @@ public class IoTDBConfig {
     this.triggerTemporaryLibDir = triggerDir + File.separator + 
IoTDBConstant.TMP_FOLDER_NAME;
   }
 
-  public String getPipeDir() {
+  public String getPipeLibDir() {
     return pipeDir;
   }
 
-  public void setPipeDir(String pipeDir) {
+  public void setPipeLibDir(String pipeDir) {
     this.pipeDir = pipeDir;
     updatePipeTemporaryLibDir();
   }
@@ -3886,12 +3886,12 @@ public class IoTDBConfig {
     return modeMapSizeThreshold;
   }
 
-  public void setPipeSubtaskExecutorMaxThreadNum(int pipeMaxThreadNum) {
-    this.pipeMaxThreadNum = pipeMaxThreadNum;
+  public void setPipeReceiverFileDir(String pipeReceiveFileDir) {
+    this.pipeReceiveFileDir = pipeReceiveFileDir;
   }
 
-  public int getPipeSubtaskExecutorMaxThreadNum() {
-    return pipeMaxThreadNum;
+  public String getPipeReceiverFileDir() {
+    return pipeReceiveFileDir;
   }
 
   public boolean isQuotaEnable() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java 
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 040ca1938d9..be0df79e585 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1952,16 +1952,10 @@ public class IoTDBDescriptor {
   }
 
   private void loadPipeProps(Properties properties) {
-    conf.setPipeDir(properties.getProperty("pipe_lib_dir", conf.getPipeDir()));
+    conf.setPipeLibDir(properties.getProperty("pipe_lib_dir", 
conf.getPipeLibDir()));
 
-    conf.setPipeSubtaskExecutorMaxThreadNum(
-        Integer.parseInt(
-            properties.getProperty(
-                "pipe_max_thread_num",
-                Integer.toString(conf.getPipeSubtaskExecutorMaxThreadNum()))));
-    if (conf.getPipeSubtaskExecutorMaxThreadNum() <= 0) {
-      conf.setPipeSubtaskExecutorMaxThreadNum(5);
-    }
+    conf.setPipeReceiverFileDir(
+        properties.getProperty("pipe_receiver_file_dir", 
conf.getPipeReceiverFileDir()));
   }
 
   private void loadCQProps(Properties properties) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 12fb4a66152..adf15709aff 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -98,8 +98,8 @@ class PipeAgentLauncher {
   private static void initPipePluginRelatedInstances() throws StartupException 
{
     try {
       PipePluginExecutableManager.setupAndGetInstance(
-          IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
-      
PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir());
+          IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeLibDir());
+      
PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeLibDir());
     } catch (IOException e) {
       throw new StartupException(e);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 5c097b0c444..2c1d5cc85b5 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.pipe.agent.runtime;
 
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
@@ -54,6 +55,7 @@ public class PipeRuntimeAgent implements IService {
 
   @Override
   public synchronized void start() throws StartupException {
+    PipeConfig.getInstance().printAllConfigs();
     PipeAgentLauncher.launchPipeTaskAgent();
 
     isShutdown.set(false);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java 
b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
deleted file mode 100644
index 31eff23bc1b..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.config;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import java.io.File;
-
-// TODO: make these parameters configurable
-// TODO: make all pipe related parameters in one place
-// TODO: set the default value of the parameters in IoTDBDescriptor
-// TODO: move it to common module?
-public class PipeConfig {
-
-  /**
-   * The name of the directory that stores the tsfiles temporarily hold or 
generated by the pipe
-   * module. The directory is located in the data directory of IoTDB.
-   */
-  public static final String PIPE_TSFILE_DIR_NAME = "pipe";
-
-  private final int defaultRingBufferSize = 65536;
-
-  private final int matcherCacheSize = 1024;
-
-  private final int realtimeCollectorPendingQueueCapacity = 65536;
-
-  // this should be less than or equals to 
realtimeCollectorPendingQueueCapacity
-  private final int realtimeCollectorPendingQueueTabletLimit =
-      realtimeCollectorPendingQueueCapacity / 2;
-
-  private final int readFileBufferSize = 8388608;
-
-  private final long pendingQueueMaxBlockingTimeMs = 1000;
-
-  public int getDefaultRingBufferSize() {
-    return defaultRingBufferSize;
-  }
-
-  public int getMatcherCacheSize() {
-    return matcherCacheSize;
-  }
-
-  public int getRealtimeCollectorPendingQueueCapacity() {
-    return realtimeCollectorPendingQueueCapacity;
-  }
-
-  public int getRealtimeCollectorPendingQueueTabletLimit() {
-    return realtimeCollectorPendingQueueTabletLimit;
-  }
-
-  public String getReceiveFileDir() {
-    return IoTDBDescriptor.getInstance().getConfig().getSystemDir()
-        + File.separator
-        + "pipe"; // TODO: replace with resource manager
-  }
-
-  public int getReadFileBufferSize() {
-    return readFileBufferSize;
-  }
-
-  public long getPendingQueueMaxBlockingTimeMs() {
-    return pendingQueueMaxBlockingTimeMs;
-  }
-
-  /////////////////////////////// Singleton ///////////////////////////////
-
-  private PipeConfig() {}
-
-  public static PipeConfig getInstance() {
-    return PipeConfigHolder.INSTANCE;
-  }
-
-  private static class PipeConfigHolder {
-    private static final PipeConfig INSTANCE = new PipeConfig();
-  }
-}
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index abb3a109c2c..7ca784dd3ce 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,9 +19,9 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
 import 
org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
@@ -111,7 +111,7 @@ public class PipeRealtimeDataRegionHybridCollector extends 
PipeRealtimeDataRegio
 
   private boolean isApproachingCapacity() {
     return pendingQueue.size()
-        >= 
PipeConfig.getInstance().getRealtimeCollectorPendingQueueTabletLimit();
+        >= PipeConfig.getInstance().getPipeCollectorPendingQueueTabletLimit();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
index 163b450021c..f0dc0a461be 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
 
-import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
@@ -48,7 +48,8 @@ public class DisruptorQueue<E> {
   }
 
   public static class Builder<E> {
-    private int ringBufferSize = 
PipeConfig.getInstance().getDefaultRingBufferSize();
+    private int ringBufferSize =
+        
PipeConfig.getInstance().getPipeCollectorAssignerDisruptorRingBufferSize();
     private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
     private ProducerType producerType = ProducerType.MULTI;
     private WaitStrategy waitStrategy = new BlockingWaitStrategy();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
index c8429331730..b68a7e99c0f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
 
-import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
 import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -47,7 +47,9 @@ public class CachedSchemaPatternMatcher implements 
PipeDataRegionMatcher {
     this.lock = new ReentrantReadWriteLock();
     this.collectors = new HashSet<>();
     this.deviceToCollectorsCache =
-        
Caffeine.newBuilder().maximumSize(PipeConfig.getInstance().getMatcherCacheSize()).build();
+        Caffeine.newBuilder()
+            
.maximumSize(PipeConfig.getInstance().getPipeCollectorMatcherCacheSize())
+            .build();
   }
 
   @Override
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 65837f31d42..d320366552e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
 import org.apache.iotdb.commons.client.property.ThriftClientProperty;
 import org.apache.iotdb.commons.conf.CommonConfig;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorClient;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
@@ -170,7 +170,7 @@ public class IoTDBThriftConnectorV1 implements 
PipeConnector {
     final File tsFile = pipeTsFileInsertionEvent.getTsFile();
 
     // 1. transfer file piece by piece
-    final int readFileBufferSize = 
PipeConfig.getInstance().getReadFileBufferSize();
+    final int readFileBufferSize = 
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
     final byte[] readBuffer = new byte[readFileBufferSize];
     long position = 0;
     try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
index 2cbcc748db7..025025e1c03 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
 import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
 import 
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
@@ -54,7 +53,7 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
 
   private static final IoTDBConfig IOTDB_CONFIG = 
IoTDBDescriptor.getInstance().getConfig();
-  private static final String RECEIVE_DIR = 
PipeConfig.getInstance().getReceiveFileDir();
+  private static final String RECEIVER_FILE_DIR = 
IOTDB_CONFIG.getPipeReceiverFileDir();
 
   private File writingFile;
   private RandomAccessFile writingFileWriter;
@@ -179,11 +178,11 @@ public class IoTDBThriftReceiverV1 implements 
IoTDBThriftReceiver {
       writingFile = null;
     }
 
-    final File receiveDir = new File(RECEIVE_DIR);
+    final File receiveDir = new File(RECEIVER_FILE_DIR);
     if (!receiveDir.exists()) {
       boolean ignored = receiveDir.mkdirs();
     }
-    writingFile = new File(RECEIVE_DIR, fileName);
+    writingFile = new File(RECEIVER_FILE_DIR, fileName);
     writingFileWriter = new RandomAccessFile(writingFile, "rw");
     LOGGER.info(String.format("start to write transferring file %s.", 
writingFile.getPath()));
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index a1aeade92b6..87fd30b5a30 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.core.connector.manager;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -75,10 +76,10 @@ public class PipeConnectorSubtaskManager {
             "Failed to construct PipeConnector, because of " + e.getMessage(), 
e);
       }
 
-      // TODO: make pendingQueue size configurable
       // 2. construct PipeConnectorSubtaskLifeCycle to manage 
PipeConnectorSubtask's life cycle
       final ListenableBoundedBlockingPendingQueue<Event> pendingQueue =
-          new ListenableBoundedBlockingPendingQueue<>(1024);
+          new ListenableBoundedBlockingPendingQueue<>(
+              PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
       final PipeConnectorSubtask pipeConnectorSubtask =
           new PipeConnectorSubtask(attributeSortedString, taskMeta, 
pendingQueue, pipeConnector);
       final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
index 4bfe8f0bb9f..6d77b9f47af 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
@@ -20,13 +20,13 @@
 package org.apache.iotdb.db.pipe.execution.executor;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 public class PipeAssignerSubtaskExecutor extends PipeSubtaskExecutor {
 
   PipeAssignerSubtaskExecutor() {
     super(
-        
IoTDBDescriptor.getInstance().getConfig().getPipeSubtaskExecutorMaxThreadNum(),
+        PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
         ThreadName.PIPE_ASSIGNER_EXECUTOR_POOL);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
index 33ba3a4210f..b6a26508ddc 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
@@ -20,13 +20,13 @@
 package org.apache.iotdb.db.pipe.execution.executor;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 public class PipeConnectorSubtaskExecutor extends PipeSubtaskExecutor {
 
   PipeConnectorSubtaskExecutor() {
     super(
-        
IoTDBDescriptor.getInstance().getConfig().getPipeSubtaskExecutorMaxThreadNum(),
+        PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
         ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
index e3a1e1a7eca..f806e287fc4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
@@ -20,13 +20,13 @@
 package org.apache.iotdb.db.pipe.execution.executor;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 
 public class PipeProcessorSubtaskExecutor extends PipeSubtaskExecutor {
 
   PipeProcessorSubtaskExecutor() {
     super(
-        
IoTDBDescriptor.getInstance().getConfig().getPipeSubtaskExecutorMaxThreadNum(),
+        PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
         ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL);
   }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
index dada354e743..d7db8364a59 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.execution.scheduler;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
 
 public class PipeSubtaskScheduler {
@@ -27,14 +28,14 @@ public class PipeSubtaskScheduler {
 
   private boolean isFirstSchedule = true;
 
-  // TODO: make these two configurable
-
-  private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT = 
10_000;
+  private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT =
+      
PipeConfig.getInstance().getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount();
   private int consumedEventCountCheckpointInterval;
   private int consumedEventCount;
 
   // in ms
-  private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION = 10 * 
1000L;
+  private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION =
+      
PipeConfig.getInstance().getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration();
   private long timeDurationCheckpointInterval;
   private long lastCheckTime;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index 942ab600536..ca81f1c7643 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -20,8 +20,8 @@
 package org.apache.iotdb.db.pipe.resource.file;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
 
 import java.io.File;
 import java.io.IOException;
@@ -102,19 +102,19 @@ public class PipeFileResourceManager {
   private static String getPipeTsFileDirPath(File file) throws IOException {
     while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
         && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
-        && !file.getName().equals(PipeConfig.PIPE_TSFILE_DIR_NAME)) {
+        && 
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
 {
       file = file.getParentFile();
     }
     return file.getParentFile().getCanonicalPath()
         + File.separator
-        + PipeConfig.PIPE_TSFILE_DIR_NAME;
+        + PipeConfig.getInstance().getPipeHardlinkTsFileDirName();
   }
 
   private static String getRelativeFilePath(File file) {
     StringBuilder builder = new StringBuilder(file.getName());
     while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
         && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
-        && !file.getName().equals(PipeConfig.PIPE_TSFILE_DIR_NAME)) {
+        && 
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
 {
       file = file.getParentFile();
       builder =
           new StringBuilder(file.getName())
@@ -168,7 +168,7 @@ public class PipeFileResourceManager {
    * <p>this method can be only invoked when the system is booting up.
    */
   public synchronized void clear(String dataDir) {
-    File pipeTsFileDir = new File(dataDir, PipeConfig.PIPE_TSFILE_DIR_NAME);
+    File pipeTsFileDir = new File(dataDir, 
PipeConfig.getInstance().getPipeHardlinkTsFileDirName());
     if (pipeTsFileDir.exists()) {
       FileUtils.deleteDirectory(pipeTsFileDir);
     }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
index 77faae275e5..1acadceb5fd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
@@ -19,8 +19,8 @@
 
 package org.apache.iotdb.db.pipe.resource.file;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.DirectoryFileFilter;
@@ -43,7 +43,8 @@ public class PipeHardlinkFileDirStartupCleaner {
       for (File file :
           FileUtils.listFilesAndDirs(
               new File(dataDir), DirectoryFileFilter.INSTANCE, 
DirectoryFileFilter.INSTANCE)) {
-        if (file.isDirectory() && 
file.getName().equals(PipeConfig.PIPE_TSFILE_DIR_NAME)) {
+        if (file.isDirectory()
+            && 
file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName())) 
{
           LOGGER.info(
               "pipe hardlink tsfile dir found, deleting it: {}, result: {}",
               file,
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index fb007b239f6..265bdf09234 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.queue;
 
-import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.pipe.api.event.Event;
 
 import org.slf4j.Logger;
@@ -37,7 +37,7 @@ public abstract class ListenableBlockingPendingQueue<E 
extends Event> {
       LoggerFactory.getLogger(ListenableBlockingPendingQueue.class);
 
   private static final long MAX_BLOCKING_TIME_MS =
-      PipeConfig.getInstance().getPendingQueueMaxBlockingTimeMs();
+      
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
 
   private final BlockingQueue<E> pendingQueue;
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 4cbcb7e731f..4d7ef849fd6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.pipe.task.subtask;
 
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
 import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import 
org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
@@ -109,8 +110,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
           retry++;
           LOGGER.error("Failed to reconnect to the target system, retrying... 
({} time(s))", retry);
           try {
-            // TODO: make the retry interval configurable
-            Thread.sleep(retry * 1000L);
+            Thread.sleep(retry * 
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
           } catch (InterruptedException interruptedException) {
             LOGGER.info(
                 "Interrupted while sleeping, perhaps need to check whether the 
thread is interrupted.");

Reply via email to