This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new fbc8828b81b [IOTDB-5921] Pipe: config & descriptor (#9957)
fbc8828b81b is described below
commit fbc8828b81b52165085ded01ea599dd455553058
Author: Itami Sho <[email protected]>
AuthorDate: Tue May 30 19:01:37 2023 +0800
[IOTDB-5921] Pipe: config & descriptor (#9957)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../manager/load/service/HeartbeatService.java | 8 +-
.../manager/pipe/runtime/PipeMetaSyncer.java | 8 +-
.../resources/conf/iotdb-common.properties | 54 +++++--
.../apache/iotdb/commons/conf/CommonConfig.java | 159 +++++++++++++++++++++
.../iotdb/commons/conf/CommonDescriptor.java | 87 +++++++++++
.../iotdb/commons/pipe/config/PipeConfig.java | 151 +++++++++++++++++++
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 16 +--
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 12 +-
.../db/pipe/agent/runtime/PipeAgentLauncher.java | 4 +-
.../db/pipe/agent/runtime/PipeRuntimeAgent.java | 2 +
.../apache/iotdb/db/pipe/config/PipeConfig.java | 93 ------------
.../PipeRealtimeDataRegionHybridCollector.java | 4 +-
.../realtime/assigner/DisruptorQueue.java | 5 +-
.../matcher/CachedSchemaPatternMatcher.java | 6 +-
.../impl/iotdb/v1/IoTDBThriftConnectorV1.java | 4 +-
.../impl/iotdb/v1/IoTDBThriftReceiverV1.java | 7 +-
.../manager/PipeConnectorSubtaskManager.java | 5 +-
.../executor/PipeAssignerSubtaskExecutor.java | 4 +-
.../executor/PipeConnectorSubtaskExecutor.java | 4 +-
.../executor/PipeProcessorSubtaskExecutor.java | 4 +-
.../execution/scheduler/PipeSubtaskScheduler.java | 9 +-
.../resource/file/PipeFileResourceManager.java | 10 +-
.../file/PipeHardlinkFileDirStartupCleaner.java | 5 +-
.../task/queue/ListenableBlockingPendingQueue.java | 4 +-
.../db/pipe/task/subtask/PipeConnectorSubtask.java | 4 +-
25 files changed, 507 insertions(+), 162 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index 38a6d8dc5f8..3e91b2b4013 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import
org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import
org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
@@ -124,8 +125,11 @@ public class HeartbeatService {
// We sample DataNode's load in every 10 heartbeat loop
heartbeatReq.setNeedSamplingLoad(heartbeatCounter.get() % 10 == 0);
heartbeatReq.setSchemaQuotaCount(configManager.getClusterSchemaManager().getSchemaQuotaCount());
- // We collect pipe meta in every 100 heartbeat loop, TODO: make this
configurable
- heartbeatReq.setNeedPipeMetaList(heartbeatCounter.get() % 100 == 0);
+ // We collect pipe meta in every 100 heartbeat loop
+ heartbeatReq.setNeedPipeMetaList(
+ heartbeatCounter.get()
+ %
PipeConfig.getInstance().getHeartbeatLoopCyclesForCollectingPipeMeta()
+ == 0);
if (!configManager.getClusterQuotaManager().hasSpaceQuotaLimit()) {
heartbeatReq.setSchemaRegionIds(configManager.getClusterQuotaManager().getSchemaRegionIds());
heartbeatReq.setDataRegionIds(configManager.getClusterQuotaManager().getDataRegionIds());
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
index 4d6ebb04ff0..2251d0d7892 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/runtime/PipeMetaSyncer.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -40,9 +41,10 @@ public class PipeMetaSyncer {
private static final ScheduledExecutorService SYNC_EXECUTOR =
IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
ThreadName.PIPE_META_SYNC_SERVICE.getName());
- // TODO: make these configurable
- private static final long INITIAL_SYNC_DELAY_MINUTES = 3;
- private static final long SYNC_INTERVAL_MINUTES = 3;
+ private static final long INITIAL_SYNC_DELAY_MINUTES =
+ PipeConfig.getInstance().getPipeMetaSyncerInitialSyncDelayMinutes();
+ private static final long SYNC_INTERVAL_MINUTES =
+ PipeConfig.getInstance().getPipeMetaSyncerSyncIntervalMinutes();
private final ConfigManager configManager;
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties
b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index e21a6b39118..edbee060d08 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -963,7 +963,7 @@ cluster_name=defaultCluster
# continuous_query_min_every_interval_in_ms=1000
####################
-### PIPE Configuration
+### Pipe Configuration
####################
# Uncomment the following field to configure the pipe lib directory.
@@ -975,18 +975,52 @@ cluster_name=defaultCluster
# If its prefix is "/", then the path is absolute. Otherwise, it is relative.
# pipe_lib_dir=ext/pipe
+# The name of the directory that stores the tsfiles temporarily hold or
generated by the pipe module.
+# The directory is located in the data directory of IoTDB.
+# pipe_hardlink_tsfile_dir_name=pipe
+
# The maximum number of threads that can be used to execute the pipe subtasks
in PipeSubtaskExecutor.
-# pipe_max_thread_num = 5
+# pipe_subtask_executor_max_thread_num=5
-# White IP list of Sync client.
-# Please use the form of IPv4 network segment to present the range of IP, for
example: 192.168.0.0/16
-# If there are more than one IP segment, please separate them by commas
-# The default is to reject all IP to sync except 127.0.0.1
-# Datatype: String
-# ip_white_list=127.0.0.1/32
+# The number of events that need to be consumed before a checkpoint is
triggered.
+#
pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count=10000
+
+# The time duration (in milliseconds) between checkpoints.
+# pipe_subtask_executor_basic_check_point_interval_by_time_duration=10000
+
+# The maximum blocking time (in milliseconds) for the pending queue.
+# pipe_subtask_executor_pending_queue_max_blocking_time_ms=1000
+
+# The default size of ring buffer in the realtime collector's disruptor queue.
+# pipe_collector_assigner_disruptor_ring_buffer_size=65536
+
+# The maximum number of entries the deviceToCollectorsCache can hold.
+# pipe_collector_matcher_cache_size=1024
+
+# The capacity for the number of tablet events that can be stored in the
pending queue of the Hybrid Realtime Collector.
+# pipe_collector_pending_queue_capacity=128
+
+# The limit for the number of tablet events that can be held in the pending
queue of the Hybrid Realtime Collector.
+# Noted that: this should be less than or equals to
realtimeCollectorPendingQueueCapacity
+# pipe_collector_pending_queue_tablet_limit=64
+
+# The buffer size used for reading file during file transfer.
+# pipe_connector_read_file_buffer_size=8388608
+
+# The delay period (in milliseconds) between each retry when a connection
failure occurs.
+# pipe_connector_retry_interval_ms=1000
+
+# The size of the pending queue for the PipeConnector to store the events.
+# pipe_connector_pending_queue_size=1024
+
+# The number of heartbeat loop cycles before collecting pipe meta once
+# pipe_heartbeat_loop_cycles_for_collecting_pipe_meta=100
+
+# The initial delay before starting the PipeMetaSyncer service.
+# pipe_meta_syncer_initial_sync_delay_minutes=3
-# The maximum number of retry when syncing a file to receiver fails.
-# max_number_of_sync_file_retry=5
+# The sync regular interval (in minutes)for the PipeMetaSyncer service.
+# pipe_meta_syncer_sync_interval_minutes=3
####################
### RatisConsensus Configuration
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index a44810612f2..e5a9fae2d8d 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -130,6 +130,35 @@ public class CommonConfig {
private String timestampPrecision = "ms";
+ /** Pipe related */
+
+ /**
+ * The name of the directory that stores the tsfiles temporarily hold or
generated by the pipe
+ * module. The directory is located in the data directory of IoTDB.
+ */
+ private String pipeHardlinkTsFileDirName = "pipe";
+
+ /** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor */
+ private int pipeSubtaskExecutorMaxThreadNum = 5;
+
+ private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
10_000;
+ private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 *
1000L;
+ private long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs = 1000;
+
+ private int pipeCollectorAssignerDisruptorRingBufferSize = 65536;
+ private int pipeCollectorMatcherCacheSize = 1024;
+ private int pipeCollectorPendingQueueCapacity = 128;
+ // this should be less than or equals to
realtimeCollectorPendingQueueCapacity
+ private int pipeCollectorPendingQueueTabletLimit =
pipeCollectorPendingQueueCapacity / 2;
+
+ private int pipeConnectorReadFileBufferSize = 8388608;
+ private long pipeConnectorRetryIntervalMs = 1000L;
+ private int pipeConnectorPendingQueueSize = 1024;
+
+ private int pipeHeartbeatLoopCyclesForCollectingPipeMeta = 100;
+ private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
+ private long pipeMetaSyncerSyncIntervalMinutes = 3;
+
CommonConfig() {}
public void updatePath(String homeDir) {
@@ -390,4 +419,134 @@ public class CommonConfig {
public String getTimestampPrecision() {
return timestampPrecision;
}
+
+ public String getPipeHardlinkTsFileDirName() {
+ return pipeHardlinkTsFileDirName;
+ }
+
+ public void setPipeHardlinkTsFileDirName(String pipeHardlinkTsFileDirName) {
+ this.pipeHardlinkTsFileDirName = pipeHardlinkTsFileDirName;
+ }
+
+ public int getPipeCollectorAssignerDisruptorRingBufferSize() {
+ return pipeCollectorAssignerDisruptorRingBufferSize;
+ }
+
+ public void setPipeCollectorAssignerDisruptorRingBufferSize(
+ int pipeCollectorAssignerDisruptorRingBufferSize) {
+ this.pipeCollectorAssignerDisruptorRingBufferSize =
+ pipeCollectorAssignerDisruptorRingBufferSize;
+ }
+
+ public int getPipeCollectorMatcherCacheSize() {
+ return pipeCollectorMatcherCacheSize;
+ }
+
+ public void setPipeCollectorMatcherCacheSize(int
pipeCollectorMatcherCacheSize) {
+ this.pipeCollectorMatcherCacheSize = pipeCollectorMatcherCacheSize;
+ }
+
+ public int getPipeCollectorPendingQueueCapacity() {
+ return pipeCollectorPendingQueueCapacity;
+ }
+
+ public void setPipeCollectorPendingQueueCapacity(int
pipeCollectorPendingQueueCapacity) {
+ this.pipeCollectorPendingQueueCapacity = pipeCollectorPendingQueueCapacity;
+ }
+
+ public int getPipeCollectorPendingQueueTabletLimit() {
+ return pipeCollectorPendingQueueTabletLimit;
+ }
+
+ public void setPipeCollectorPendingQueueTabletLimit(int
pipeCollectorPendingQueueTabletLimit) {
+ this.pipeCollectorPendingQueueTabletLimit =
pipeCollectorPendingQueueTabletLimit;
+ }
+
+ public int getPipeConnectorReadFileBufferSize() {
+ return pipeConnectorReadFileBufferSize;
+ }
+
+ public void setPipeConnectorReadFileBufferSize(int
pipeConnectorReadFileBufferSize) {
+ this.pipeConnectorReadFileBufferSize = pipeConnectorReadFileBufferSize;
+ }
+
+ public int getPipeHeartbeatLoopCyclesForCollectingPipeMeta() {
+ return pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+ }
+
+ public void setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
+ int pipeHeartbeatLoopCyclesForCollectingPipeMeta) {
+ this.pipeHeartbeatLoopCyclesForCollectingPipeMeta =
+ pipeHeartbeatLoopCyclesForCollectingPipeMeta;
+ }
+
+ public long getPipeMetaSyncerInitialSyncDelayMinutes() {
+ return pipeMetaSyncerInitialSyncDelayMinutes;
+ }
+
+ public void setPipeMetaSyncerInitialSyncDelayMinutes(long
pipeMetaSyncerInitialSyncDelayMinutes) {
+ this.pipeMetaSyncerInitialSyncDelayMinutes =
pipeMetaSyncerInitialSyncDelayMinutes;
+ }
+
+ public long getPipeMetaSyncerSyncIntervalMinutes() {
+ return pipeMetaSyncerSyncIntervalMinutes;
+ }
+
+ public void setPipeMetaSyncerSyncIntervalMinutes(long
pipeMetaSyncerSyncIntervalMinutes) {
+ this.pipeMetaSyncerSyncIntervalMinutes = pipeMetaSyncerSyncIntervalMinutes;
+ }
+
+ public long getPipeConnectorRetryIntervalMs() {
+ return pipeConnectorRetryIntervalMs;
+ }
+
+ public void setPipeConnectorRetryIntervalMs(long
pipeConnectorRetryIntervalMs) {
+ this.pipeConnectorRetryIntervalMs = pipeConnectorRetryIntervalMs;
+ }
+
+ public int getPipeConnectorPendingQueueSize() {
+ return pipeConnectorPendingQueueSize;
+ }
+
+ public void setPipeConnectorPendingQueueSize(int
pipeConnectorPendingQueueSize) {
+ this.pipeConnectorPendingQueueSize = pipeConnectorPendingQueueSize;
+ }
+
+ public int
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
+ return pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
+ }
+
+ public void
setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
+ int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount) {
+ this.pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount =
+ pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount;
+ }
+
+ public long getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration() {
+ return pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration;
+ }
+
+ public void setPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration(
+ long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration) {
+ this.pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration =
+ pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration;
+ }
+
+ public int getPipeSubtaskExecutorMaxThreadNum() {
+ return pipeSubtaskExecutorMaxThreadNum;
+ }
+
+ public void setPipeSubtaskExecutorMaxThreadNum(int
pipeSubtaskExecutorMaxThreadNum) {
+ this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum;
+ }
+
+ public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
+ return pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
+ }
+
+ public void setPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(
+ long pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs) {
+ this.pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs =
+ pipeSubtaskExecutorPendingQueueMaxBlockingTimeMs;
+ }
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index ef947180567..bd72a56ae88 100644
---
a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -207,6 +207,8 @@ public class CommonDescriptor {
properties.getProperty(
"target_ml_node_endpoint",
NodeUrlUtils.convertTEndPointUrl(config.getTargetMLNodeEndPoint()));
+
+ loadPipeProps(properties);
try {
config.setTargetMLNodeEndPoint(NodeUrlUtils.parseTEndPointUrl(endPointUrl));
} catch (BadNodeUrlException e) {
@@ -216,6 +218,91 @@ public class CommonDescriptor {
}
}
+ private void loadPipeProps(Properties properties) {
+ config.setPipeHardlinkTsFileDirName(
+ properties.getProperty(
+ "pipe_hardlink_tsfile_dir_name",
config.getPipeHardlinkTsFileDirName()));
+
+ config.setPipeSubtaskExecutorMaxThreadNum(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_subtask_executor_max_thread_num",
+
Integer.toString(config.getPipeSubtaskExecutorMaxThreadNum()))));
+ if (config.getPipeSubtaskExecutorMaxThreadNum() <= 0) {
+ config.setPipeSubtaskExecutorMaxThreadNum(5);
+ }
+ config.setPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount(
+ Integer.parseInt(
+ properties.getProperty(
+
"pipe_subtask_executor_basic_check_point_interval_by_consumed_event_count",
+ String.valueOf(
+
config.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount()))));
+ config.setPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration(
+ Integer.parseInt(
+ properties.getProperty(
+
"pipe_subtask_executor_basic_check_point_interval_by_time_duration",
+ String.valueOf(
+
config.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration()))));
+ config.setPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_subtask_executor_pending_queue_max_blocking_time_ms",
+
String.valueOf(config.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs()))));
+
+ config.setPipeCollectorAssignerDisruptorRingBufferSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_collector_assigner_disruptor_ring_buffer_size",
+
String.valueOf(config.getPipeCollectorAssignerDisruptorRingBufferSize()))));
+ config.setPipeCollectorMatcherCacheSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_collector_matcher_cache_size",
+ String.valueOf(config.getPipeCollectorMatcherCacheSize()))));
+ config.setPipeCollectorPendingQueueCapacity(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_collector_pending_queue_capacity",
+
String.valueOf(config.getPipeCollectorPendingQueueCapacity()))));
+ config.setPipeCollectorPendingQueueTabletLimit(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_collector_pending_queue_tablet_limit",
+
String.valueOf(config.getPipeCollectorPendingQueueTabletLimit()))));
+
+ config.setPipeConnectorReadFileBufferSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_connector_read_file_buffer_size",
+ String.valueOf(config.getPipeConnectorReadFileBufferSize()))));
+ config.setPipeConnectorRetryIntervalMs(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_connector_retry_interval_ms",
+ String.valueOf(config.getPipeConnectorRetryIntervalMs()))));
+ config.setPipeConnectorPendingQueueSize(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_connector_pending_queue_size",
+ String.valueOf(config.getPipeConnectorPendingQueueSize()))));
+
+ config.setPipeHeartbeatLoopCyclesForCollectingPipeMeta(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_heartbeat_loop_cycles_for_collecting_pipe_meta",
+
String.valueOf(config.getPipeHeartbeatLoopCyclesForCollectingPipeMeta()))));
+ config.setPipeMetaSyncerInitialSyncDelayMinutes(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_meta_syncer_initial_sync_delay_minutes",
+
String.valueOf(config.getPipeMetaSyncerInitialSyncDelayMinutes()))));
+ config.setPipeMetaSyncerSyncIntervalMinutes(
+ Integer.parseInt(
+ properties.getProperty(
+ "pipe_meta_syncer_sync_interval_minutes",
+
String.valueOf(config.getPipeMetaSyncerSyncIntervalMinutes()))));
+ }
+
public void loadGlobalConfig(TGlobalConfig globalConfig) {
config.setDiskSpaceWarningThreshold(globalConfig.getDiskSpaceWarningThreshold());
}
diff --git
a/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
new file mode 100644
index 00000000000..365ea0b8003
--- /dev/null
+++
b/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.pipe.config;
+
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PipeConfig {
+
+ private final CommonConfig COMMON_CONFIG =
CommonDescriptor.getInstance().getConfig();
+
+ /////////////////////////////// File ///////////////////////////////
+
+ public String getPipeHardlinkTsFileDirName() {
+ return COMMON_CONFIG.getPipeHardlinkTsFileDirName();
+ }
+
+ /////////////////////////////// Subtask Executor
///////////////////////////////
+
+ public int getPipeSubtaskExecutorMaxThreadNum() {
+ return COMMON_CONFIG.getPipeSubtaskExecutorMaxThreadNum();
+ }
+
+ public int
getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount() {
+ return
COMMON_CONFIG.getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount();
+ }
+
+ public long getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration() {
+ return
COMMON_CONFIG.getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration();
+ }
+
+ public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
+ return COMMON_CONFIG.getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
+ }
+
+ /////////////////////////////// Collector ///////////////////////////////
+
+ public int getPipeCollectorAssignerDisruptorRingBufferSize() {
+ return COMMON_CONFIG.getPipeCollectorAssignerDisruptorRingBufferSize();
+ }
+
+ public int getPipeCollectorMatcherCacheSize() {
+ return COMMON_CONFIG.getPipeCollectorMatcherCacheSize();
+ }
+
+ public int getPipeCollectorPendingQueueCapacity() {
+ return COMMON_CONFIG.getPipeCollectorPendingQueueCapacity();
+ }
+
+ public int getPipeCollectorPendingQueueTabletLimit() {
+ return COMMON_CONFIG.getPipeCollectorPendingQueueTabletLimit();
+ }
+
+ /////////////////////////////// Connector ///////////////////////////////
+
+ public int getPipeConnectorReadFileBufferSize() {
+ return COMMON_CONFIG.getPipeConnectorReadFileBufferSize();
+ }
+
+ public long getPipeConnectorRetryIntervalMs() {
+ return COMMON_CONFIG.getPipeConnectorRetryIntervalMs();
+ }
+
+ public int getPipeConnectorPendingQueueSize() {
+ return COMMON_CONFIG.getPipeConnectorPendingQueueSize();
+ }
+
+ /////////////////////////////// Meta Consistency
///////////////////////////////
+
+ public int getHeartbeatLoopCyclesForCollectingPipeMeta() {
+ return COMMON_CONFIG.getPipeHeartbeatLoopCyclesForCollectingPipeMeta();
+ }
+
+ public long getPipeMetaSyncerInitialSyncDelayMinutes() {
+ return COMMON_CONFIG.getPipeMetaSyncerInitialSyncDelayMinutes();
+ }
+
+ public long getPipeMetaSyncerSyncIntervalMinutes() {
+ return COMMON_CONFIG.getPipeMetaSyncerSyncIntervalMinutes();
+ }
+
+ /////////////////////////////// Utils ///////////////////////////////
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeConfig.class);
+
+ public void printAllConfigs() {
+ LOGGER.info("PipeHardlinkTsFileDirName: {}",
getPipeHardlinkTsFileDirName());
+
+ LOGGER.info("PipeSubtaskExecutorMaxThreadNum: {}",
getPipeSubtaskExecutorMaxThreadNum());
+ LOGGER.info(
+ "PipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount: {}",
+ getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount());
+ LOGGER.info(
+ "PipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration: {}",
+ getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration());
+ LOGGER.info(
+ "PipeSubtaskExecutorPendingQueueMaxBlockingTimeMs: {}",
+ getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs());
+
+ LOGGER.info(
+ "PipeCollectorAssignerDisruptorRingBufferSize: {}",
+ getPipeCollectorAssignerDisruptorRingBufferSize());
+ LOGGER.info("PipeCollectorMatcherCacheSize: {}",
getPipeCollectorMatcherCacheSize());
+ LOGGER.info("PipeCollectorPendingQueueCapacity: {}",
getPipeCollectorPendingQueueCapacity());
+ LOGGER.info(
+ "PipeCollectorPendingQueueTabletLimit: {}",
getPipeCollectorPendingQueueTabletLimit());
+
+ LOGGER.info("PipeConnectorReadFileBufferSize: {}",
getPipeConnectorReadFileBufferSize());
+ LOGGER.info("PipeConnectorRetryIntervalMs: {}",
getPipeConnectorRetryIntervalMs());
+ LOGGER.info("PipeConnectorPendingQueueSize: {}",
getPipeConnectorPendingQueueSize());
+
+ LOGGER.info(
+ "HeartbeatLoopCyclesForCollectingPipeMeta: {}",
+ getHeartbeatLoopCyclesForCollectingPipeMeta());
+ LOGGER.info(
+ "PipeMetaSyncerInitialSyncDelayMinutes: {}",
getPipeMetaSyncerInitialSyncDelayMinutes());
+ LOGGER.info("PipeMetaSyncerSyncIntervalMinutes: {}",
getPipeMetaSyncerSyncIntervalMinutes());
+ }
+
+ /////////////////////////////// Singleton ///////////////////////////////
+
+ private PipeConfig() {}
+
+ public static PipeConfig getInstance() {
+ return PipeConfigHolder.INSTANCE;
+ }
+
+ private static class PipeConfigHolder {
+ private static final PipeConfig INSTANCE = new PipeConfig();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 48c3e0f853d..06fcdb54b34 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1120,8 +1120,8 @@ public class IoTDBConfig {
private int maxPendingBatchesNum = 12;
private double maxMemoryRatioForQueue = 0.6;
- /** The maximum number of threads that can be used to execute subtasks in
PipeSubtaskExecutor */
- private int pipeMaxThreadNum = 5;
+ /** Pipe related */
+ private String pipeReceiveFileDir = systemDir + File.separator + "pipe";
/** Resource control */
private boolean quotaEnable = false;
@@ -1541,11 +1541,11 @@ public class IoTDBConfig {
this.triggerTemporaryLibDir = triggerDir + File.separator +
IoTDBConstant.TMP_FOLDER_NAME;
}
- public String getPipeDir() {
+ public String getPipeLibDir() {
return pipeDir;
}
- public void setPipeDir(String pipeDir) {
+ public void setPipeLibDir(String pipeDir) {
this.pipeDir = pipeDir;
updatePipeTemporaryLibDir();
}
@@ -3886,12 +3886,12 @@ public class IoTDBConfig {
return modeMapSizeThreshold;
}
- public void setPipeSubtaskExecutorMaxThreadNum(int pipeMaxThreadNum) {
- this.pipeMaxThreadNum = pipeMaxThreadNum;
+ public void setPipeReceiverFileDir(String pipeReceiveFileDir) {
+ this.pipeReceiveFileDir = pipeReceiveFileDir;
}
- public int getPipeSubtaskExecutorMaxThreadNum() {
- return pipeMaxThreadNum;
+ public String getPipeReceiverFileDir() {
+ return pipeReceiveFileDir;
}
public boolean isQuotaEnable() {
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 040ca1938d9..be0df79e585 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -1952,16 +1952,10 @@ public class IoTDBDescriptor {
}
private void loadPipeProps(Properties properties) {
- conf.setPipeDir(properties.getProperty("pipe_lib_dir", conf.getPipeDir()));
+ conf.setPipeLibDir(properties.getProperty("pipe_lib_dir",
conf.getPipeLibDir()));
- conf.setPipeSubtaskExecutorMaxThreadNum(
- Integer.parseInt(
- properties.getProperty(
- "pipe_max_thread_num",
- Integer.toString(conf.getPipeSubtaskExecutorMaxThreadNum()))));
- if (conf.getPipeSubtaskExecutorMaxThreadNum() <= 0) {
- conf.setPipeSubtaskExecutorMaxThreadNum(5);
- }
+ conf.setPipeReceiverFileDir(
+ properties.getProperty("pipe_receiver_file_dir",
conf.getPipeReceiverFileDir()));
}
private void loadCQProps(Properties properties) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
index 12fb4a66152..adf15709aff 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java
@@ -98,8 +98,8 @@ class PipeAgentLauncher {
private static void initPipePluginRelatedInstances() throws StartupException
{
try {
PipePluginExecutableManager.setupAndGetInstance(
- IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeDir());
-
PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeDir());
+ IOTDB_CONFIG.getPipeTemporaryLibDir(), IOTDB_CONFIG.getPipeLibDir());
+
PipePluginClassLoaderManager.setupAndGetInstance(IOTDB_CONFIG.getPipeLibDir());
} catch (IOException e) {
throw new StartupException(e);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
index 5c097b0c444..2c1d5cc85b5 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.agent.runtime;
import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
@@ -54,6 +55,7 @@ public class PipeRuntimeAgent implements IService {
@Override
public synchronized void start() throws StartupException {
+ PipeConfig.getInstance().printAllConfigs();
PipeAgentLauncher.launchPipeTaskAgent();
isShutdown.set(false);
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
b/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
deleted file mode 100644
index 31eff23bc1b..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/pipe/config/PipeConfig.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.db.pipe.config;
-
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
-import java.io.File;
-
-// TODO: make these parameters configurable
-// TODO: make all pipe related parameters in one place
-// TODO: set the default value of the parameters in IoTDBDescriptor
-// TODO: move it to common module?
-public class PipeConfig {
-
- /**
- * The name of the directory that stores the tsfiles temporarily hold or
generated by the pipe
- * module. The directory is located in the data directory of IoTDB.
- */
- public static final String PIPE_TSFILE_DIR_NAME = "pipe";
-
- private final int defaultRingBufferSize = 65536;
-
- private final int matcherCacheSize = 1024;
-
- private final int realtimeCollectorPendingQueueCapacity = 65536;
-
- // this should be less than or equals to
realtimeCollectorPendingQueueCapacity
- private final int realtimeCollectorPendingQueueTabletLimit =
- realtimeCollectorPendingQueueCapacity / 2;
-
- private final int readFileBufferSize = 8388608;
-
- private final long pendingQueueMaxBlockingTimeMs = 1000;
-
- public int getDefaultRingBufferSize() {
- return defaultRingBufferSize;
- }
-
- public int getMatcherCacheSize() {
- return matcherCacheSize;
- }
-
- public int getRealtimeCollectorPendingQueueCapacity() {
- return realtimeCollectorPendingQueueCapacity;
- }
-
- public int getRealtimeCollectorPendingQueueTabletLimit() {
- return realtimeCollectorPendingQueueTabletLimit;
- }
-
- public String getReceiveFileDir() {
- return IoTDBDescriptor.getInstance().getConfig().getSystemDir()
- + File.separator
- + "pipe"; // TODO: replace with resource manager
- }
-
- public int getReadFileBufferSize() {
- return readFileBufferSize;
- }
-
- public long getPendingQueueMaxBlockingTimeMs() {
- return pendingQueueMaxBlockingTimeMs;
- }
-
- /////////////////////////////// Singleton ///////////////////////////////
-
- private PipeConfig() {}
-
- public static PipeConfig getInstance() {
- return PipeConfigHolder.INSTANCE;
- }
-
- private static class PipeConfigHolder {
- private static final PipeConfig INSTANCE = new PipeConfig();
- }
-}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
index abb3a109c2c..7ca784dd3ce 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/PipeRealtimeDataRegionHybridCollector.java
@@ -19,9 +19,9 @@
package org.apache.iotdb.db.pipe.core.collector.realtime;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.db.pipe.core.event.realtime.TsFileEpoch;
import
org.apache.iotdb.db.pipe.task.queue.ListenableUnboundedBlockingPendingQueue;
@@ -111,7 +111,7 @@ public class PipeRealtimeDataRegionHybridCollector extends
PipeRealtimeDataRegio
private boolean isApproachingCapacity() {
return pendingQueue.size()
- >=
PipeConfig.getInstance().getRealtimeCollectorPendingQueueTabletLimit();
+ >= PipeConfig.getInstance().getPipeCollectorPendingQueueTabletLimit();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
index 163b450021c..f0dc0a461be 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/assigner/DisruptorQueue.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime.assigner;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
@@ -48,7 +48,8 @@ public class DisruptorQueue<E> {
}
public static class Builder<E> {
- private int ringBufferSize =
PipeConfig.getInstance().getDefaultRingBufferSize();
+ private int ringBufferSize =
+
PipeConfig.getInstance().getPipeCollectorAssignerDisruptorRingBufferSize();
private ThreadFactory threadFactory = DaemonThreadFactory.INSTANCE;
private ProducerType producerType = ProducerType.MULTI;
private WaitStrategy waitStrategy = new BlockingWaitStrategy();
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
index c8429331730..b68a7e99c0f 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/collector/realtime/matcher/CachedSchemaPatternMatcher.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.core.collector.realtime.matcher;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.db.pipe.core.collector.realtime.PipeRealtimeDataRegionCollector;
import org.apache.iotdb.db.pipe.core.event.realtime.PipeRealtimeCollectEvent;
import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
@@ -47,7 +47,9 @@ public class CachedSchemaPatternMatcher implements
PipeDataRegionMatcher {
this.lock = new ReentrantReadWriteLock();
this.collectors = new HashSet<>();
this.deviceToCollectorsCache =
-
Caffeine.newBuilder().maximumSize(PipeConfig.getInstance().getMatcherCacheSize()).build();
+ Caffeine.newBuilder()
+
.maximumSize(PipeConfig.getInstance().getPipeCollectorMatcherCacheSize())
+ .build();
}
@Override
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
index 65837f31d42..d320366552e 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftConnectorV1.java
@@ -22,9 +22,9 @@ package org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1;
import org.apache.iotdb.commons.client.property.ThriftClientProperty;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.config.PipeConnectorConstant;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorClient;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
@@ -170,7 +170,7 @@ public class IoTDBThriftConnectorV1 implements
PipeConnector {
final File tsFile = pipeTsFileInsertionEvent.getTsFile();
// 1. transfer file piece by piece
- final int readFileBufferSize =
PipeConfig.getInstance().getReadFileBufferSize();
+ final int readFileBufferSize =
PipeConfig.getInstance().getPipeConnectorReadFileBufferSize();
final byte[] readBuffer = new byte[readFileBufferSize];
long position = 0;
try (final RandomAccessFile reader = new RandomAccessFile(tsFile, "r")) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
index 2cbcc748db7..025025e1c03 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/impl/iotdb/v1/IoTDBThriftReceiverV1.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.db.mpp.plan.execution.ExecutionResult;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.pipe.agent.receiver.IoTDBThriftReceiver;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.IoTDBThriftConnectorVersion;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.reponse.PipeTransferFilePieceResp;
import
org.apache.iotdb.db.pipe.core.connector.impl.iotdb.v1.request.PipeTransferFilePieceReq;
@@ -54,7 +53,7 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBThriftReceiverV1.class);
private static final IoTDBConfig IOTDB_CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private static final String RECEIVE_DIR =
PipeConfig.getInstance().getReceiveFileDir();
+ private static final String RECEIVER_FILE_DIR =
IOTDB_CONFIG.getPipeReceiverFileDir();
private File writingFile;
private RandomAccessFile writingFileWriter;
@@ -179,11 +178,11 @@ public class IoTDBThriftReceiverV1 implements
IoTDBThriftReceiver {
writingFile = null;
}
- final File receiveDir = new File(RECEIVE_DIR);
+ final File receiveDir = new File(RECEIVER_FILE_DIR);
if (!receiveDir.exists()) {
boolean ignored = receiveDir.mkdirs();
}
- writingFile = new File(RECEIVE_DIR, fileName);
+ writingFile = new File(RECEIVER_FILE_DIR, fileName);
writingFileWriter = new RandomAccessFile(writingFile, "rw");
LOGGER.info(String.format("start to write transferring file %s.",
writingFile.getPath()));
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
index a1aeade92b6..87fd30b5a30 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/core/connector/manager/PipeConnectorSubtaskManager.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.core.connector.manager;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.plugin.builtin.BuiltinPipePlugin;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
@@ -75,10 +76,10 @@ public class PipeConnectorSubtaskManager {
"Failed to construct PipeConnector, because of " + e.getMessage(),
e);
}
- // TODO: make pendingQueue size configurable
// 2. construct PipeConnectorSubtaskLifeCycle to manage
PipeConnectorSubtask's life cycle
final ListenableBoundedBlockingPendingQueue<Event> pendingQueue =
- new ListenableBoundedBlockingPendingQueue<>(1024);
+ new ListenableBoundedBlockingPendingQueue<>(
+ PipeConfig.getInstance().getPipeConnectorPendingQueueSize());
final PipeConnectorSubtask pipeConnectorSubtask =
new PipeConnectorSubtask(attributeSortedString, taskMeta,
pendingQueue, pipeConnector);
final PipeConnectorSubtaskLifeCycle pipeConnectorSubtaskLifeCycle =
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
index 4bfe8f0bb9f..6d77b9f47af 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeAssignerSubtaskExecutor.java
@@ -20,13 +20,13 @@
package org.apache.iotdb.db.pipe.execution.executor;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
public class PipeAssignerSubtaskExecutor extends PipeSubtaskExecutor {
PipeAssignerSubtaskExecutor() {
super(
-
IoTDBDescriptor.getInstance().getConfig().getPipeSubtaskExecutorMaxThreadNum(),
+ PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
ThreadName.PIPE_ASSIGNER_EXECUTOR_POOL);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
index 33ba3a4210f..b6a26508ddc 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeConnectorSubtaskExecutor.java
@@ -20,13 +20,13 @@
package org.apache.iotdb.db.pipe.execution.executor;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
public class PipeConnectorSubtaskExecutor extends PipeSubtaskExecutor {
PipeConnectorSubtaskExecutor() {
super(
-
IoTDBDescriptor.getInstance().getConfig().getPipeSubtaskExecutorMaxThreadNum(),
+ PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
ThreadName.PIPE_CONNECTOR_EXECUTOR_POOL);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
index e3a1e1a7eca..f806e287fc4 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/executor/PipeProcessorSubtaskExecutor.java
@@ -20,13 +20,13 @@
package org.apache.iotdb.db.pipe.execution.executor;
import org.apache.iotdb.commons.concurrent.ThreadName;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
public class PipeProcessorSubtaskExecutor extends PipeSubtaskExecutor {
PipeProcessorSubtaskExecutor() {
super(
-
IoTDBDescriptor.getInstance().getConfig().getPipeSubtaskExecutorMaxThreadNum(),
+ PipeConfig.getInstance().getPipeSubtaskExecutorMaxThreadNum(),
ThreadName.PIPE_PROCESSOR_EXECUTOR_POOL);
}
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
index dada354e743..d7db8364a59 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/execution/scheduler/PipeSubtaskScheduler.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.execution.scheduler;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.pipe.execution.executor.PipeSubtaskExecutor;
public class PipeSubtaskScheduler {
@@ -27,14 +28,14 @@ public class PipeSubtaskScheduler {
private boolean isFirstSchedule = true;
- // TODO: make these two configurable
-
- private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT =
10_000;
+ private static final int BASIC_CHECKPOINT_INTERVAL_BY_CONSUMED_EVENT_COUNT =
+
PipeConfig.getInstance().getPipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount();
private int consumedEventCountCheckpointInterval;
private int consumedEventCount;
// in ms
- private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION = 10 *
1000L;
+ private static final long BASIC_CHECKPOINT_INTERVAL_BY_TIME_DURATION =
+
PipeConfig.getInstance().getPipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration();
private long timeDurationCheckpointInterval;
private long lastCheckTime;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
index 942ab600536..ca81f1c7643 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeFileResourceManager.java
@@ -20,8 +20,8 @@
package org.apache.iotdb.db.pipe.resource.file;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.utils.FileUtils;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
import java.io.File;
import java.io.IOException;
@@ -102,19 +102,19 @@ public class PipeFileResourceManager {
private static String getPipeTsFileDirPath(File file) throws IOException {
while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
&& !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
- && !file.getName().equals(PipeConfig.PIPE_TSFILE_DIR_NAME)) {
+ &&
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
{
file = file.getParentFile();
}
return file.getParentFile().getCanonicalPath()
+ File.separator
- + PipeConfig.PIPE_TSFILE_DIR_NAME;
+ + PipeConfig.getInstance().getPipeHardlinkTsFileDirName();
}
private static String getRelativeFilePath(File file) {
StringBuilder builder = new StringBuilder(file.getName());
while (!file.getName().equals(IoTDBConstant.SEQUENCE_FLODER_NAME)
&& !file.getName().equals(IoTDBConstant.UNSEQUENCE_FLODER_NAME)
- && !file.getName().equals(PipeConfig.PIPE_TSFILE_DIR_NAME)) {
+ &&
!file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
{
file = file.getParentFile();
builder =
new StringBuilder(file.getName())
@@ -168,7 +168,7 @@ public class PipeFileResourceManager {
* <p>this method can be only invoked when the system is booting up.
*/
public synchronized void clear(String dataDir) {
- File pipeTsFileDir = new File(dataDir, PipeConfig.PIPE_TSFILE_DIR_NAME);
+ File pipeTsFileDir = new File(dataDir,
PipeConfig.getInstance().getPipeHardlinkTsFileDirName());
if (pipeTsFileDir.exists()) {
FileUtils.deleteDirectory(pipeTsFileDir);
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
index 77faae275e5..1acadceb5fd 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/resource/file/PipeHardlinkFileDirStartupCleaner.java
@@ -19,8 +19,8 @@
package org.apache.iotdb.db.pipe.resource.file;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
@@ -43,7 +43,8 @@ public class PipeHardlinkFileDirStartupCleaner {
for (File file :
FileUtils.listFilesAndDirs(
new File(dataDir), DirectoryFileFilter.INSTANCE,
DirectoryFileFilter.INSTANCE)) {
- if (file.isDirectory() &&
file.getName().equals(PipeConfig.PIPE_TSFILE_DIR_NAME)) {
+ if (file.isDirectory()
+ &&
file.getName().equals(PipeConfig.getInstance().getPipeHardlinkTsFileDirName()))
{
LOGGER.info(
"pipe hardlink tsfile dir found, deleting it: {}, result: {}",
file,
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
index fb007b239f6..265bdf09234 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/queue/ListenableBlockingPendingQueue.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.task.queue;
-import org.apache.iotdb.db.pipe.config.PipeConfig;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.pipe.api.event.Event;
import org.slf4j.Logger;
@@ -37,7 +37,7 @@ public abstract class ListenableBlockingPendingQueue<E
extends Event> {
LoggerFactory.getLogger(ListenableBlockingPendingQueue.class);
private static final long MAX_BLOCKING_TIME_MS =
- PipeConfig.getInstance().getPendingQueueMaxBlockingTimeMs();
+
PipeConfig.getInstance().getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs();
private final BlockingQueue<E> pendingQueue;
diff --git
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
index 4cbcb7e731f..4d7ef849fd6 100644
---
a/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
+++
b/server/src/main/java/org/apache/iotdb/db/pipe/task/subtask/PipeConnectorSubtask.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.task.subtask;
+import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.task.meta.PipeTaskMeta;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import
org.apache.iotdb.db.pipe.task.queue.ListenableBoundedBlockingPendingQueue;
@@ -109,8 +110,7 @@ public class PipeConnectorSubtask extends PipeSubtask {
retry++;
LOGGER.error("Failed to reconnect to the target system, retrying...
({} time(s))", retry);
try {
- // TODO: make the retry interval configurable
- Thread.sleep(retry * 1000L);
+ Thread.sleep(retry *
PipeConfig.getInstance().getPipeConnectorRetryIntervalMs());
} catch (InterruptedException interruptedException) {
LOGGER.info(
"Interrupted while sleeping, perhaps need to check whether the
thread is interrupted.");