This is an automated email from the ASF dual-hosted git repository.

ashingau pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f30e50676ec [opt](scanner) optimize the number of threads of scanners 
(#28640)
f30e50676ec is described below

commit f30e50676ec93aa647ef96f78996616686f92c12
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Dec 26 10:24:12 2023 +0800

    [opt](scanner) optimize the number of threads of scanners (#28640)
    
    1. Remove `doris_max_remote_scanner_thread_pool_thread_num`, use 
`doris_scanner_thread_pool_thread_num` only.
    2. Set the default value `doris_scanner_thread_pool_thread_num` as 
`std::max(48, CpuInfo::num_cores() * 4)`
---
 be/src/common/config.cpp                                     | 12 +++++++++++-
 be/src/common/config.h                                       |  6 +++++-
 be/src/io/fs/buffered_reader.h                               |  7 +++----
 be/src/vec/exec/scan/scanner_scheduler.cpp                   |  2 ++
 .../src/main/java/org/apache/doris/hudi/HudiJniScanner.java  |  4 ++--
 5 files changed, 23 insertions(+), 8 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2f12cea1fa3..919b83f0c7d 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -41,6 +41,7 @@
 #include "common/status.h"
 #include "io/fs/file_writer.h"
 #include "io/fs/local_file_system.h"
+#include "util/cpu_info.h"
 
 namespace doris::config {
 
@@ -228,7 +229,14 @@ DEFINE_Bool(doris_enable_scanner_thread_pool_per_disk, 
"true");
 DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
 // number of scanner thread pool size for olap table
 // and the min thread num of remote scanner thread pool
-DEFINE_Int32(doris_scanner_thread_pool_thread_num, "48");
+DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
+DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> 
bool {
+    if (config == -1) {
+        CpuInfo::init();
+        doris_scanner_thread_pool_thread_num = std::max(48, 
CpuInfo::num_cores() * 4);
+    }
+    return true;
+});
 DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
 // number of olap scanner thread pool queue size
 DEFINE_Int32(doris_scanner_thread_pool_queue_size, "102400");
@@ -864,6 +872,8 @@ DEFINE_mInt32(parquet_rowgroup_max_buffer_mb, "128");
 // Max buffer size for parquet chunk column
 DEFINE_mInt32(parquet_column_max_buffer_mb, "8");
 DEFINE_mDouble(max_amplified_read_ratio, "0.8");
+DEFINE_mInt32(merged_oss_min_io_size, "1048576");
+DEFINE_mInt32(merged_hdfs_min_io_size, "8192");
 
 // OrcReader
 DEFINE_mInt32(orc_natural_read_size_mb, "8");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index abdf1c67a2e..affe47851fb 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -276,7 +276,7 @@ DECLARE_Bool(doris_enable_scanner_thread_pool_per_disk);
 DECLARE_mInt64(doris_blocking_priority_queue_wait_timeout_ms);
 // number of scanner thread pool size for olap table
 // and the min thread num of remote scanner thread pool
-DECLARE_Int32(doris_scanner_thread_pool_thread_num);
+DECLARE_mInt32(doris_scanner_thread_pool_thread_num);
 // max number of remote scanner thread pool size
 // if equal to -1, value is std::max(512, CpuInfo::num_cores() * 10)
 DECLARE_Int32(doris_max_remote_scanner_thread_pool_thread_num);
@@ -921,6 +921,10 @@ DECLARE_mInt32(parquet_rowgroup_max_buffer_mb);
 DECLARE_mInt32(parquet_column_max_buffer_mb);
 // Merge small IO, the max amplified read ratio
 DECLARE_mDouble(max_amplified_read_ratio);
+// Equivalent min size of each IO that can reach the maximum storage speed 
limit
+// 1MB for oss, 8KB for hdfs
+DECLARE_mInt32(merged_oss_min_io_size);
+DECLARE_mInt32(merged_hdfs_min_io_size);
 
 // OrcReader
 DECLARE_mInt32(orc_natural_read_size_mb);
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index e78c1c79251..2f8eb465cdf 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -131,8 +131,6 @@ public:
     static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024;      // 8MB
     static constexpr size_t BOX_SIZE = 1 * 1024 * 1024;             // 1MB
     static constexpr size_t SMALL_IO = 2 * 1024 * 1024;             // 2MB
-    static constexpr size_t HDFS_MIN_IO_SIZE = 4 * 1024;            // 4KB
-    static constexpr size_t OSS_MIN_IO_SIZE = 512 * 1024;           // 512KB
     static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
 
     MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
@@ -146,8 +144,9 @@ public:
         _is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr;
         _max_amplified_ratio = config::max_amplified_read_ratio;
         // Equivalent min size of each IO that can reach the maximum storage 
speed limit:
-        // 512KB for oss, 4KB for hdfs
-        _equivalent_io_size = _is_oss ? OSS_MIN_IO_SIZE : HDFS_MIN_IO_SIZE;
+        // 1MB for oss, 8KB for hdfs
+        _equivalent_io_size =
+                _is_oss ? config::merged_oss_min_io_size : 
config::merged_hdfs_min_io_size;
         for (const PrefetchRange& range : _random_access_ranges) {
             _statistics.apply_bytes += range.end_offset - range.start_offset;
         }
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 42285f1d22a..84f56813c34 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -122,6 +122,8 @@ Status ScannerScheduler::init(ExecEnv* env) {
     _remote_thread_pool_max_size = 
config::doris_max_remote_scanner_thread_pool_thread_num != -1
                                            ? 
config::doris_max_remote_scanner_thread_pool_thread_num
                                            : std::max(512, 
CpuInfo::num_cores() * 10);
+    _remote_thread_pool_max_size =
+            std::max(_remote_thread_pool_max_size, 
config::doris_scanner_thread_pool_thread_num);
     _remote_scan_thread_pool = std::make_unique<PriorityThreadPool>(
             _remote_thread_pool_max_size, 
config::doris_remote_scanner_thread_pool_queue_size,
             "RemoteScanThreadPool");
diff --git 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
index daf8d4a21fa..17ca650675d 100644
--- 
a/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
+++ 
b/fe/be-java-extensions/hudi-scanner/src/main/java/org/apache/doris/hudi/HudiJniScanner.java
@@ -85,8 +85,8 @@ public class HudiJniScanner extends JniScanner {
     private static final ScheduledExecutorService cleanResolverService = 
Executors.newScheduledThreadPool(1);
 
     static {
-        int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 
2 + 1, 4);
-        if (numThreads > 32) {
+        int numThreads = Math.max(Runtime.getRuntime().availableProcessors() * 
2, 4);
+        if (numThreads > 48) {
             numThreads = Runtime.getRuntime().availableProcessors();
         }
         avroReadPool = Executors.newFixedThreadPool(numThreads,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to