This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0e95a6a7747 [exec](pipeline) runtime filter wait time (#34872)
0e95a6a7747 is described below
commit 0e95a6a77477d72488f3cb43af4f50e2541dcf04
Author: HappenLee <[email protected]>
AuthorDate: Mon May 20 23:43:59 2024 +0800
[exec](pipeline) runtime filter wait time (#34872)
---
be/src/exprs/runtime_filter.h | 2 ++
be/src/pipeline/dependency.cpp | 39 +++++++++++++++++++++++------
be/src/pipeline/dependency.h | 17 +++++++++----
be/src/vec/exec/runtime_filter_consumer.cpp | 25 +++++++++++++++---
4 files changed, 68 insertions(+), 15 deletions(-)
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 0deb7c3ddd7..3cedde8e8a5 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -244,6 +244,8 @@ public:
bool has_remote_target() const { return _has_remote_target; }
+ bool has_local_target() const { return _has_local_target; }
+
bool is_ready() const {
return (!_enable_pipeline_exec && _rf_state ==
RuntimeFilterState::READY) ||
(_enable_pipeline_exec &&
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index d37e3dc0401..e736174764d 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -119,6 +119,27 @@ void RuntimeFilterTimer::call_ready() {
_parent->set_ready();
}
+// should check rf timeout in two case:
+// 1. the rf is ready just remove the wait queue
+// 2. if the rf have local dependency, the rf should start wait when all local
dependency is ready
+bool RuntimeFilterTimer::should_be_check_timeout() {
+ if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) {
+ bool all_ready = true;
+ for (auto& dep : _local_runtime_filter_dependencies) {
+ if (!dep->ready()) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ _local_runtime_filter_dependencies.clear();
+ _registration_time = MonotonicMillis();
+ }
+ return all_ready;
+ }
+ return true;
+}
+
void RuntimeFilterTimerQueue::start() {
while (!_stop) {
std::unique_lock<std::mutex> lk(cv_m);
@@ -135,14 +156,18 @@ void RuntimeFilterTimerQueue::start() {
for (auto& it : _que) {
if (it.use_count() == 1) {
// `use_count == 1` means this runtime filter has been
released
- } else if (it->_parent->is_blocked_by(nullptr)) {
- // This means runtime filter is not ready, so we call
timeout or continue to poll this timer.
- int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
- if (ms_since_registration > it->wait_time_ms()) {
- it->call_timeout();
- } else {
- new_que.push_back(std::move(it));
+ } else if (it->should_be_check_timeout()) {
+ if (it->_parent->is_blocked_by(nullptr)) {
+ // This means runtime filter is not ready, so we call
timeout or continue to poll this timer.
+ int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
+ if (ms_since_registration > it->wait_time_ms()) {
+ it->call_timeout();
+ } else {
+ new_que.push_back(std::move(it));
+ }
}
+ } else {
+ new_que.push_back(std::move(it));
}
}
new_que.swap(_que);
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index de2e81bcda3..580cb8368c8 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -107,6 +107,7 @@ public:
BasicSharedState* shared_state() { return _shared_state; }
void set_shared_state(BasicSharedState* shared_state) { _shared_state =
shared_state; }
virtual std::string debug_string(int indentation_level = 0);
+ bool ready() const { return _ready; }
// Start the watcher. We use it to count how long this dependency block
the current pipeline task.
void start_watcher() { _watcher.start(); }
@@ -231,11 +232,19 @@ public:
int64_t registration_time() const { return _registration_time; }
int32_t wait_time_ms() const { return _wait_time_ms; }
+ void set_local_runtime_filter_dependencies(
+ const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps)
{
+ _local_runtime_filter_dependencies = deps;
+ }
+
+ bool should_be_check_timeout();
+
private:
friend struct RuntimeFilterTimerQueue;
std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
+ std::vector<std::shared_ptr<RuntimeFilterDependency>>
_local_runtime_filter_dependencies;
std::mutex _lock;
- const int64_t _registration_time;
+ int64_t _registration_time;
const int32_t _wait_time_ms;
};
@@ -258,11 +267,9 @@ struct RuntimeFilterTimerQueue {
~RuntimeFilterTimerQueue() = default;
RuntimeFilterTimerQueue() { _thread =
std::thread(&RuntimeFilterTimerQueue::start, this); }
- void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>
filter) { push(filter); }
-
- void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) {
+ void
push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&&
filter) {
std::unique_lock<std::mutex> lc(_que_lock);
- _que.push_back(filter);
+ _que.insert(_que.end(), filter.begin(), filter.end());
cv.notify_all();
}
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 0c6766b8bca..f80be824d58 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -67,17 +67,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
runtime_filter_dependencies,
const int id, const int node_id, const std::string& name) {
runtime_filter_dependencies.resize(_runtime_filter_descs.size());
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>
runtime_filter_timers(
+ _runtime_filter_descs.size());
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>
+ local_runtime_filter_dependencies;
+
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] =
std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, runtime_filter);
_runtime_filter_ctxs[i].runtime_filter_dependency =
runtime_filter_dependencies[i].get();
- auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter_timers[i] =
std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(),
runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
- runtime_filter->set_filter_timer(filter_timer);
-
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
+ runtime_filter->set_filter_timer(runtime_filter_timers[i]);
+ if (runtime_filter->has_local_target()) {
+
local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]);
+ }
+ }
+
+ // The gloabl runtime filter timer need set local runtime filter
dependencies.
+ // start to wait before the local runtime filter ready
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
+ if (!runtime_filter->has_local_target()) {
+ runtime_filter_timers[i]->set_local_runtime_filter_dependencies(
+ local_runtime_filter_dependencies);
+ }
}
+ ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(
+ std::move(runtime_filter_timers));
}
Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]