This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 453e3c18f4c [refactor](buffer) remove download buffer since it is no
longer useful (#28832)
453e3c18f4c is described below
commit 453e3c18f4c095fbcd99dc4d6f0be67a30a6b907
Author: zclllyybb <[email protected]>
AuthorDate: Fri Dec 22 11:53:31 2023 +0800
[refactor](buffer) remove download buffer since it is no longer useful
(#28832)
remove download buffer since it is no longer useful
---
be/src/common/config.cpp | 6 ---
be/src/common/config.h | 6 ---
be/src/runtime/exec_env.h | 26 +-----------
be/src/runtime/exec_env_init.cpp | 53 +++++-------------------
be/src/util/doris_metrics.h | 2 -
docs/en/docs/admin-manual/config/be-config.md | 6 ---
docs/zh-CN/docs/admin-manual/config/be-config.md | 6 ---
7 files changed, 12 insertions(+), 93 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 1241ae39e67..03eaee7b23c 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -792,12 +792,6 @@ DEFINE_Validator(max_send_batch_parallelism_per_job,
DEFINE_Int32(send_batch_thread_pool_thread_num, "64");
// number of send batch thread pool queue size
DEFINE_Int32(send_batch_thread_pool_queue_size, "102400");
-// number of download cache thread pool size
-DEFINE_Int32(download_cache_thread_pool_thread_num, "48");
-// number of download cache thread pool queue size
-DEFINE_Int32(download_cache_thread_pool_queue_size, "102400");
-// download cache buffer size
-DEFINE_Int64(download_cache_buffer_size, "10485760");
// Limit the number of segment of a newly created rowset.
// The newly created rowset may to be compacted after loading,
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 856e2482745..e011073d44d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -851,12 +851,6 @@ DECLARE_mInt32(max_send_batch_parallelism_per_job);
DECLARE_Int32(send_batch_thread_pool_thread_num);
// number of send batch thread pool queue size
DECLARE_Int32(send_batch_thread_pool_queue_size);
-// number of download cache thread pool size
-DECLARE_Int32(download_cache_thread_pool_thread_num);
-// number of download cache thread pool queue size
-DECLARE_Int32(download_cache_thread_pool_queue_size);
-// download cache buffer size
-DECLARE_Int64(download_cache_buffer_size);
// Limit the number of segment of a newly created rowset.
// The newly created rowset may to be compacted after loading,
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index ce534c7ce04..95c4e97e1b3 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -18,10 +18,10 @@
#pragma once
#include <common/multi_version.h>
-#include <stddef.h>
#include <algorithm>
#include <atomic>
+#include <cstddef>
#include <map>
#include <memory>
#include <mutex>
@@ -168,7 +168,6 @@ public:
MemTracker* brpc_iobuf_block_memory_tracker() { return
_brpc_iobuf_block_memory_tracker.get(); }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
- ThreadPool* download_cache_thread_pool() { return
_download_cache_thread_pool.get(); }
ThreadPool* buffered_reader_prefetch_thread_pool() {
return _buffered_reader_prefetch_thread_pool.get();
}
@@ -177,23 +176,8 @@ public:
ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get();
}
ThreadPool* lazy_release_obj_pool() { return _lazy_release_obj_pool.get();
}
- void set_serial_download_cache_thread_token() {
- _serial_download_cache_thread_token =
-
download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1);
- }
- ThreadPoolToken* get_serial_download_cache_thread_token() {
- return _serial_download_cache_thread_token.get();
- }
- void init_download_cache_buf();
- void init_download_cache_required_components();
Status init_pipeline_task_scheduler();
void init_file_cache_factory();
- char* get_download_cache_buf(ThreadPoolToken* token) {
- if (_download_cache_buf_map.find(token) ==
_download_cache_buf_map.end()) {
- return nullptr;
- }
- return _download_cache_buf_map[token].get();
- }
io::FileCacheFactory* file_cache_factory() { return _file_cache_factory; }
UserFunctionCache* user_function_cache() { return _user_function_cache; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
@@ -322,23 +306,17 @@ private:
std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
std::unique_ptr<ThreadPool> _send_batch_thread_pool;
-
- // Threadpool used to download cache from remote storage
- std::unique_ptr<ThreadPool> _download_cache_thread_pool;
// Threadpool used to prefetch remote file for buffered reader
std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool;
// Threadpool used to upload local file to s3
std::unique_ptr<ThreadPool> _s3_file_upload_thread_pool;
- // A token used to submit download cache task serially
- std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token;
// Pool used by fragment manager to send profile or status to FE
coordinator
std::unique_ptr<ThreadPool> _send_report_thread_pool;
// Pool used by join node to build hash table
std::unique_ptr<ThreadPool> _join_node_thread_pool;
// Pool to use a new thread to release object
std::unique_ptr<ThreadPool> _lazy_release_obj_pool;
- // ThreadPoolToken -> buffer
- std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>>
_download_cache_buf_map;
+
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
pipeline::TaskScheduler* _with_group_task_scheduler = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 13cb861aac2..4df69d67c03 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -17,14 +17,14 @@
// IWYU pragma: no_include <bthread/errno.h>
#include <common/multi_version.h>
-#include <errno.h> // IWYU pragma: keep
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/Metrics_types.h>
-#include <stdint.h>
-#include <stdlib.h>
-#include <string.h>
#include <sys/resource.h>
+#include <cerrno> // IWYU pragma: keep
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
#include <limits>
#include <map>
#include <memory>
@@ -109,15 +109,13 @@ class PFunctionService_Stub;
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size,
MetricUnit::NOUNIT);
static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
bool init_system_metrics = config::enable_system_metrics;
std::set<std::string> disk_devices;
std::vector<std::string> network_interfaces;
std::vector<std::string> paths;
- for (auto& store_path : store_paths) {
+ for (const auto& store_path : store_paths) {
paths.emplace_back(store_path.path);
}
if (init_system_metrics) {
@@ -167,8 +165,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool));
- init_download_cache_required_components();
-
static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
.set_min_threads(16)
.set_max_threads(64)
@@ -336,9 +332,11 @@ void ExecEnv::init_file_cache_factory() {
continue;
}
- olap_res = file_cache_init_pool->submit_func(std::bind(
- &io::FileCacheFactory::create_file_cache,
_file_cache_factory, cache_path.path,
- cache_path.init_settings(),
&(cache_status.emplace_back())));
+ olap_res = file_cache_init_pool->submit_func(
+ [this, capture0 = cache_path.path, capture1 =
cache_path.init_settings(),
+ capture2 = &(cache_status.emplace_back())] {
+ _file_cache_factory->create_file_cache(capture0,
capture1, capture2);
+ });
if (!olap_res.ok()) {
LOG(FATAL) << "failed to init file cache, err: " << olap_res;
@@ -355,7 +353,6 @@ void ExecEnv::init_file_cache_factory() {
}
}
}
- return;
}
Status ExecEnv::_init_mem_env() {
@@ -488,43 +485,18 @@ void ExecEnv::init_mem_tracker() {
std::make_shared<MemTracker>("IOBufBlockMemory",
_orphan_mem_tracker_raw);
}
-void ExecEnv::init_download_cache_buf() {
- std::unique_ptr<char[]> download_cache_buf(new
char[config::download_cache_buffer_size]);
- memset(download_cache_buf.get(), 0, config::download_cache_buffer_size);
- _download_cache_buf_map[_serial_download_cache_thread_token.get()] =
- std::move(download_cache_buf);
-}
-
-void ExecEnv::init_download_cache_required_components() {
- static_cast<void>(ThreadPoolBuilder("DownloadCacheThreadPool")
- .set_min_threads(1)
-
.set_max_threads(config::download_cache_thread_pool_thread_num)
-
.set_max_queue_size(config::download_cache_thread_pool_queue_size)
- .build(&_download_cache_thread_pool));
- set_serial_download_cache_thread_token();
- init_download_cache_buf();
-}
-
void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
[this]() { return
_send_batch_thread_pool->num_threads(); });
REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size,
[this]() { return
_send_batch_thread_pool->get_queue_size(); });
-
- REGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num,
- [this]() { return
_download_cache_thread_pool->num_threads(); });
-
- REGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size,
- [this]() { return
_download_cache_thread_pool->get_queue_size(); });
}
void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
- DEREGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num);
- DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size);
}
// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a
Stop method.
@@ -572,8 +544,6 @@ void ExecEnv::destroy() {
SAFE_SHUTDOWN(_lazy_release_obj_pool);
SAFE_SHUTDOWN(_send_report_thread_pool);
SAFE_SHUTDOWN(_send_batch_thread_pool);
- SAFE_SHUTDOWN(_serial_download_cache_thread_token);
- SAFE_SHUTDOWN(_download_cache_thread_pool);
// Free resource after threads are stopped.
// Some threads are still running, like threads created by
_new_load_stream_mgr ...
@@ -645,9 +615,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_external_scan_context_mgr);
SAFE_DELETE(_user_function_cache);
- _serial_download_cache_thread_token.reset(nullptr);
- _download_cache_thread_pool.reset(nullptr);
-
// _heartbeat_flags must be destoried after staroge engine
SAFE_DELETE(_heartbeat_flags);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index db52ceb405d..72ee81cd49b 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -194,8 +194,6 @@ public:
UIntGauge* add_batch_task_queue_size = nullptr;
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
- UIntGauge* download_cache_thread_pool_thread_num = nullptr;
- UIntGauge* download_cache_thread_pool_queue_size = nullptr;
UIntGauge* fragment_thread_pool_queue_size = nullptr;
// Upload metrics
diff --git a/docs/en/docs/admin-manual/config/be-config.md
b/docs/en/docs/admin-manual/config/be-config.md
index 1dcf7dbb241..c55bc8d00b8 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -980,12 +980,6 @@ BaseCompaction:546859:
* Description: Interval in milliseconds between memtable flush mgr refresh
iterations
* Default value: 100
-#### `download_cache_buffer_size`
-
-* Type: int64
-* Description: The size of the buffer used to receive data when downloading
the cache.
-* Default value: 10485760
-
#### `zone_map_row_num_threshold`
* Type: int32
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md
b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 802279c4cd0..567c3c598c5 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -1005,12 +1005,6 @@ BaseCompaction:546859:
* 描述:memtable主动下刷时刷新内存统计的周期(毫秒)
* 默认值:100
-#### `download_cache_buffer_size`
-
-* 类型: int64
-* 描述: 下载缓存时用于接收数据的buffer的大小。
-* 默认值: 10485760
-
#### `zone_map_row_num_threshold`
* 类型: int32
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]