This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 8ca399ab920 [exec](pipeline) runtime filter wait time (#35108)
8ca399ab920 is described below
commit 8ca399ab920e5719baacf9740f1cd2fad8d5dd6c
Author: HappenLee <[email protected]>
AuthorDate: Tue May 21 12:50:05 2024 +0800
[exec](pipeline) runtime filter wait time (#35108)
---
be/src/exprs/runtime_filter.h | 2 ++
be/src/pipeline/pipeline_x/dependency.cpp | 21 +++++++++++++++++++++
be/src/pipeline/pipeline_x/dependency.h | 17 ++++++++++++-----
be/src/vec/exec/runtime_filter_consumer.cpp | 25 ++++++++++++++++++++++---
4 files changed, 57 insertions(+), 8 deletions(-)
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 781f7ac34ff..4733d39e298 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -244,6 +244,8 @@ public:
bool has_remote_target() const { return _has_remote_target; }
+ bool has_local_target() const { return _has_local_target; }
+
bool is_ready() const {
return (!_enable_pipeline_exec && _rf_state ==
RuntimeFilterState::READY) ||
(_enable_pipeline_exec &&
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index ba47f935598..093e26ff854 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -125,6 +125,27 @@ Dependency*
RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
return ready ? nullptr : this;
}
+// should check rf timeout in two case:
+// 1. the rf is ready just remove the wait queue
+// 2. if the rf have local dependency, the rf should start wait when all local
dependency is ready
+bool RuntimeFilterTimer::should_be_check_timeout() {
+ if (!_parent->ready() && !_local_runtime_filter_dependencies.empty()) {
+ bool all_ready = true;
+ for (auto& dep : _local_runtime_filter_dependencies) {
+ if (!dep->ready()) {
+ all_ready = false;
+ break;
+ }
+ }
+ if (all_ready) {
+ _local_runtime_filter_dependencies.clear();
+ _registration_time = MonotonicMillis();
+ }
+ return all_ready;
+ }
+ return true;
+}
+
void RuntimeFilterTimer::call_timeout() {
_parent->set_ready();
}
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 693bde10f36..525a6dea562 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -112,6 +112,7 @@ public:
BasicSharedState* shared_state() { return _shared_state; }
void set_shared_state(BasicSharedState* shared_state) { _shared_state =
shared_state; }
virtual std::string debug_string(int indentation_level = 0);
+ bool ready() const { return _ready; }
// Start the watcher. We use it to count how long this dependency block
the current pipeline task.
void start_watcher() { _watcher.start(); }
@@ -256,11 +257,19 @@ public:
int64_t registration_time() const { return _registration_time; }
int32_t wait_time_ms() const { return _wait_time_ms; }
+ void set_local_runtime_filter_dependencies(
+ const std::vector<std::shared_ptr<RuntimeFilterDependency>>& deps)
{
+ _local_runtime_filter_dependencies = deps;
+ }
+
+ bool should_be_check_timeout();
+
private:
friend struct RuntimeFilterTimerQueue;
std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
+ std::vector<std::shared_ptr<RuntimeFilterDependency>>
_local_runtime_filter_dependencies;
std::mutex _lock;
- const int64_t _registration_time;
+ int64_t _registration_time;
const int32_t _wait_time_ms;
};
@@ -283,11 +292,9 @@ struct RuntimeFilterTimerQueue {
~RuntimeFilterTimerQueue() = default;
RuntimeFilterTimerQueue() { _thread =
std::thread(&RuntimeFilterTimerQueue::start, this); }
- void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>
filter) { push(filter); }
-
- void push(std::shared_ptr<pipeline::RuntimeFilterTimer> filter) {
+ void
push_filter_timer(std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>&&
filter) {
std::unique_lock<std::mutex> lc(_que_lock);
- _que.push_back(filter);
+ _que.insert(_que.end(), filter.begin(), filter.end());
cv.notify_all();
}
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 2913fad3d8d..66fd0297c98 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -81,17 +81,36 @@ void RuntimeFilterConsumer::init_runtime_filter_dependency(
runtime_filter_dependencies,
const int id, const int node_id, const std::string& name) {
runtime_filter_dependencies.resize(_runtime_filter_descs.size());
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>>
runtime_filter_timers(
+ _runtime_filter_descs.size());
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>
+ local_runtime_filter_dependencies;
+
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
runtime_filter_dependencies[i] =
std::make_shared<pipeline::RuntimeFilterDependency>(
id, node_id, name, _state->get_query_ctx(), runtime_filter);
_runtime_filter_ctxs[i].runtime_filter_dependency =
runtime_filter_dependencies[i].get();
- auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter_timers[i] =
std::make_shared<pipeline::RuntimeFilterTimer>(
runtime_filter->registration_time(),
runtime_filter->wait_time_ms(),
runtime_filter_dependencies[i]);
- runtime_filter->set_filter_timer(filter_timer);
-
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
+ runtime_filter->set_filter_timer(runtime_filter_timers[i]);
+ if (runtime_filter->has_local_target()) {
+
local_runtime_filter_dependencies.emplace_back(runtime_filter_dependencies[i]);
+ }
+ }
+
+ // The gloabl runtime filter timer need set local runtime filter
dependencies.
+ // start to wait before the local runtime filter ready
+ for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
+ IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
+ if (!runtime_filter->has_local_target()) {
+ runtime_filter_timers[i]->set_local_runtime_filter_dependencies(
+ local_runtime_filter_dependencies);
+ }
}
+ ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(
+ std::move(runtime_filter_timers));
}
Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]