This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch pipe-reduce-parameters in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bac3e0835a867af12cf9537a9d52c5ce6102d75c Author: Steve Yurong Su <[email protected]> AuthorDate: Mon Aug 14 17:11:34 2023 +0800 pipe_subtask_executor_max_thread_num --- .../src/assembly/resources/conf/iotdb-common.properties | 1 + .../src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java | 8 ++++++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties index 98d35e77ab5..cce12379ec6 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-common.properties @@ -946,6 +946,7 @@ cluster_name=defaultCluster # pipe_lib_dir=ext/pipe # The maximum number of threads that can be used to execute the pipe subtasks in PipeSubtaskExecutor. +# The actual value will be min(pipe_subtask_executor_max_thread_num, max(1, CPU core number / 2)). # pipe_subtask_executor_max_thread_num=5 # The connection timeout (in milliseconds) for the thrift client. diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java index c16a1f03798..97932889caa 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java @@ -150,7 +150,8 @@ public class CommonConfig { private boolean pipeHardLinkWALEnabled = false; /** The maximum number of threads that can be used to execute subtasks in PipeSubtaskExecutor. */ - private int pipeSubtaskExecutorMaxThreadNum = 5; + private int pipeSubtaskExecutorMaxThreadNum = + Math.min(5, Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); private int pipeSubtaskExecutorBasicCheckPointIntervalByConsumedEventCount = 10_000; private long pipeSubtaskExecutorBasicCheckPointIntervalByTimeDuration = 10 * 1000L; @@ -685,7 +686,10 @@ public class CommonConfig { } public void setPipeSubtaskExecutorMaxThreadNum(int pipeSubtaskExecutorMaxThreadNum) { - this.pipeSubtaskExecutorMaxThreadNum = pipeSubtaskExecutorMaxThreadNum; + this.pipeSubtaskExecutorMaxThreadNum = + Math.min( + pipeSubtaskExecutorMaxThreadNum, + Math.max(1, Runtime.getRuntime().availableProcessors() / 2)); } public long getPipeSubtaskExecutorPendingQueueMaxBlockingTimeMs() {
