This is an automated email from the ASF dual-hosted git repository.
ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f30e50676ec [opt](scanner) optimize the number of threads of scanners
(#28640)
f30e50676ec is described below
commit f30e50676ec93aa647ef96f78996616686f92c12
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Dec 26 10:24:12 2023 +0800
[opt](scanner) optimize the number of threads of scanners (#28640)
1. Remove `doris_max_remote_scanner_thread_pool_thread_num`, use
`doris_scanner_thread_pool_thread_num` only.
2. Set the default value `doris_scanner_thread_pool_thread_num` as
`std::max(48, CpuInfo::num_cores() * 4)`
---
be/src/common/config.cpp | 12 +++++++++++-
be/src/common/config.h | 6 +++++-
be/src/io/fs/buffered_reader.h | 7 +++----
be/src/vec/exec/scan/scanner_scheduler.cpp | 2 ++
.../src/main/java/org/apache/doris/hudi/HudiJniScanner.java | 4 ++--
5 files changed, 23 insertions(+), 8 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2f12cea1fa3..919b83f0c7d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -41,6 +41,7 @@
#include "common/status.h"
#include "io/fs/file_writer.h"
#include "io/fs/local_file_system.h"
+#include "util/cpu_info.h"
namespace doris::config {
@@ -228,7 +229,14 @@ DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk,
"true");
DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
-DEFINE_Int32(doris_scanner_thread_pool_thread_num, "48");
+DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
+DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) ->
bool {
+ if (config == -1) {
+ CpuInfo::init();
+ doris_scanner_thread_pool_thread_num = std::max(48,
CpuInfo::num_cores() * 4);
+ }
+ return true;
+});
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
// number of olap scanner thread pool queue size
DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
@@ -864,6 +872,8 @@ DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128");
// Max buffer size for parquet chunk column
DEFINE_mInt32(parquet_column_max_buffer_mb, "8");
DEFINE_mDouble(max_amplified_read_ratio, "0.8");
+DEFINE_mInt32(merged_oss_min_io_size, "1048576");
+DEFINE_mInt32(merged_hdfs_min_io_size, "8192");
// OrcReader
DEFINE_mInt32(orc_natural_read_size_mb, "8");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index abdf1c67a2e..affe47851fb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -276,7 +276,7 @@ DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
-DECLARE_Int32(doris_scanner_thread_pool_thread_num);
+DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
// max number of remote scanner thread pool size
// if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10)
DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num);
@@ -921,6 +921,10 @@ DECLARE_mInt32(parquet_rowgroup_max_buffer_mb);
DECLARE_mInt32(parquet_column_max_buffer_mb);
// Merge small IO, the max amplified read ratio
DECLARE_mDouble(max_amplified_read_ratio);
+// Equivalent min size of each IO that can reach the maximum storage speed
limit
+// 1MB for oss, 8KB for hdfs
+DECLARE_mInt32(merged_oss_min_io_size);
+DECLARE_mInt32(merged_hdfs_min_io_size);
// OrcReader
DECLARE_mInt32(orc_natural_read_size_mb);
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index e78c1c79251..2f8eb465cdf 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -131,8 +131,6 @@ public:
static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB
static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB
static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB
- static constexpr size_t HDFS_MIN_IO_SIZE = 4 * 1024; // 4KB
- static constexpr size_t OSS_MIN_IO_SIZE = 512 * 1024; // 512KB
static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
@@ -146,8 +144,9 @@ public:
_is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr;
_max_amplified_ratio = config::max_amplified_read_ratio;
// Equivalent min size of each IO that can reach the maximum storage
speed limit:
- // 512KB for oss, 4KB for hdfs
- _equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
+ // 1MB for oss, 8KB for hdfs
+ _equivalent_io_size =
+ _is_oss ? config::merged_oss_min_io_size :
config::merged_hdfs_min_io_size;
for (const PrefetchRange& range : _random_access_ranges) {
_statistics.apply_bytes += range.end_offset - range.start_offset;
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 42285f1d22a..84f56813c34 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -122,6 +122,8 @@ Status ScannerScheduler::init(ExecEnv* env) {
_remote_thread_pool_max_size =
config::doris_max_remote_scanner_thread_pool_thread_num != -1
?
config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512,
CpuInfo::num_cores() * 10);
+ _remote_thread_pool_max_size =
+ std::max(_remote_thread_pool_max_size,
config::doris_scanner_thread_pool_thread_num);
_remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
_remote_thread_pool_max_size,
config::doris_remote_scanner_thread_pool_queue_size,
"RemoteScanThreadPool");
diff --git
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index daf8d4a21fa..17ca650675d 100644
---
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -85,8 +85,8 @@ public class HudiJniScanner extends JniScanner {
private static final ScheduledExecutorService cleanResolverService =
Executors.newScheduledThreadPool(1);
static {
- int numThreads = Math.max(Runtime.getRuntime().availableProcessors() *
2 + 1, 4);
- if (numThreads > 32) {
+ int numThreads = Math.max(Runtime.getRuntime().availableProcessors() *
2, 4);
+ if (numThreads > 48) {
numThreads = Runtime.getRuntime().availableProcessors();
}
avroReadPool = Executors.newFixedThreadPool(numThreads,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]