zhiqiang-hhhh commented on code in PR #27653:
URL: https://github.com/apache/doris/pull/27653#discussion_r1406980370
##########
be/src/pipeline/pipeline_x/dependency.h:
##########
@@ -190,6 +190,58 @@ class RuntimeFilterTimer {
IRuntimeFilter* _runtime_filter;
};
+struct RuntimeFilterTimerQueue {
+ constexpr static int64_t interval = 50;
+ void run() { _thread.detach(); }
+ void start() {
+ while (!_stop) {
+ std::unique_lock<std::mutex> lk(cv_m);
+
+ cv.wait(lk, [this] { return !_que.empty() || _stop; });
+ {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>>
new_que;
+ for (auto& it : _que) {
+ if (it.use_count() == 1) {
+ it->call_has_release();
+ } else if (it->has_ready()) {
+ it->call_has_ready();
+ } else {
+ int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
+ if (ms_since_registration > it->wait_time_ms()) {
+ it->call_timeout();
+ } else {
+ new_que.push_back(std::move(it));
+ }
+ }
+ }
+ new_que.swap(_que);
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+ }
+ delete this;
Review Comment:
Lifecycle of RuntimeFilterTimerQueue shuold be controled by ExecEnv
##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -544,6 +546,7 @@ void ExecEnv::destroy() {
SAFE_STOP(_task_group_manager);
SAFE_STOP(_external_scan_context_mgr);
SAFE_STOP(_fragment_mgr);
+ SAFE_STOP(_runtime_filter_timer_queue);
Review Comment:
`_runtime_filter_timer_queue` should be deleted explicitly by ExecEnv, see
`SAFE_DELETE`
##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -212,7 +161,7 @@ void RuntimeFilterDependency::add_filters(IRuntimeFilter*
runtime_filter) {
registration_time, wait_time_ms,
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()),
runtime_filter);
runtime_filter->set_filter_timer(filter_timer);
- RuntimeFilterTimerQueue::push_filter_timer(filter_timer);
+
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
Review Comment:
`#include "runtime/exec_env.h"`
##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -544,6 +546,7 @@ void ExecEnv::destroy() {
SAFE_STOP(_task_group_manager);
SAFE_STOP(_external_scan_context_mgr);
SAFE_STOP(_fragment_mgr);
+ SAFE_STOP(_runtime_filter_timer_queue);
Review Comment:
Append `SAFE_DELETE(_runtime_filter_timer_queue);` to resource destroy area
of this function.
##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -158,6 +158,8 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_frontend_client_cache = new
FrontendServiceClientCache(config::max_client_cache_size_per_host);
_broker_client_cache = new
BrokerServiceClientCache(config::max_client_cache_size_per_host);
+ _runtime_filter_timer_queue = new
doris::pipeline::RuntimeFilterTimerQueue();
+ _runtime_filter_timer_queue->run();
Review Comment:
Does `_runtime_filter_timer_queue` have start dependency?
Shuold it be started before `pipeline_task_scheduler` or after?
Maybe find it a right place in `ExecEnv::init_pipeline_task_scheduler`
##########
be/src/runtime/exec_env_init.cpp:
##########
@@ -158,6 +158,8 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_frontend_client_cache = new
FrontendServiceClientCache(config::max_client_cache_size_per_host);
_broker_client_cache = new
BrokerServiceClientCache(config::max_client_cache_size_per_host);
+ _runtime_filter_timer_queue = new
doris::pipeline::RuntimeFilterTimerQueue();
Review Comment:
create of `_runtime_filter_timer_queue` should be moved to
`ExecEnv::init_pipeline_task_scheduler`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]