This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a8232c67f91 [pipelineX](runtime filter) Fix task timeout caused by
runtime filter (#33332) (#33369)
a8232c67f91 is described below
commit a8232c67f91bf66200227fa18a21007f38235a01
Author: Gabriel <[email protected]>
AuthorDate: Mon Apr 8 16:30:32 2024 +0800
[pipelineX](runtime filter) Fix task timeout caused by runtime filter
(#33332) (#33369)
---
be/src/exprs/runtime_filter.cpp | 23 ++++-
be/src/exprs/runtime_filter.h | 4 +-
.../exec/multi_cast_data_stream_source.cpp | 27 +++--
.../pipeline/exec/multi_cast_data_stream_source.h | 19 +++-
be/src/pipeline/exec/scan_operator.cpp | 21 ++--
be/src/pipeline/exec/scan_operator.h | 17 +++-
be/src/pipeline/pipeline_x/dependency.cpp | 112 +++++++--------------
be/src/pipeline/pipeline_x/dependency.h | 78 ++++----------
be/src/pipeline/pipeline_x/operator.h | 2 +-
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 43 +++-----
be/src/pipeline/pipeline_x/pipeline_x_task.h | 14 ++-
be/src/runtime/runtime_state.cpp | 2 +
be/src/runtime/runtime_state.h | 1 +
be/src/vec/exec/runtime_filter_consumer.cpp | 59 +++++++----
be/src/vec/exec/runtime_filter_consumer.h | 8 +-
be/src/vec/exec/scan/vscan_node.cpp | 2 +-
16 files changed, 218 insertions(+), 214 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f680db5adee..1c5c3f7d4a2 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1095,7 +1095,7 @@ Status
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
_set_push_down(!is_late_arrival);
RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs,
_probe_expr));
}
- _profile->add_info_string("Info", _format_status());
+ _profile->add_info_string("Info", formatted_state());
// The runtime filter is pushed down, adding filtering information.
auto* expr_filtered_rows_counter = ADD_COUNTER(_profile,
"expr_filtered_rows", TUnit::UNIT);
auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows",
TUnit::UNIT);
@@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
return true;
}
+void IRuntimeFilter::update_state() {
+ DCHECK(is_consumer());
+ auto execution_timeout = _state->execution_timeout * 1000;
+ auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
+ // bitmap filter is precise filter and only filter once, so it must be
applied.
+ int64_t wait_times_ms = _wrapper->get_real_type() ==
RuntimeFilterType::BITMAP_FILTER
+ ? execution_timeout
+ : runtime_filter_wait_time_ms;
+ auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+ DCHECK(_enable_pipeline_exec);
+ // In pipelineX, runtime filters will be ready or timeout before open
phase.
+ if (expected == RuntimeFilterState::NOT_READY) {
+ DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+ _rf_state_atomic = RuntimeFilterState::TIME_OUT;
+ }
+}
+
// NOTE: Wait infinitely will not make scan task wait really forever.
// Because BlockTaskSchedule will make it run when query is timedout.
bool IRuntimeFilter::wait_infinitely() const {
@@ -1236,7 +1253,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) {
_wrapper->_ignored_msg = msg;
}
-std::string IRuntimeFilter::_format_status() const {
+std::string IRuntimeFilter::formatted_state() const {
return fmt::format(
"[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {},
HasRemoteTarget = {}, "
"HasLocalTarget = {}]",
@@ -1411,7 +1428,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile*
parent_profile) {
} else {
_profile_init = true;
parent_profile->add_child(_profile.get(), true, nullptr);
- _profile->add_info_string("Info", _format_status());
+ _profile->add_info_string("Info", formatted_state());
}
}
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 620d61ae564..ff825523ae1 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -263,6 +263,7 @@ public:
// This function will wait at most
config::runtime_filter_shuffle_wait_time_ms
// if return true , filter is ready to use
bool await();
+ void update_state();
// this function will be called if a runtime filter sent by rpc
// it will notify all wait threads
void signal();
@@ -355,6 +356,7 @@ public:
int64_t registration_time() const { return registration_time_; }
void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
+ std::string formatted_state() const;
protected:
// serialize _wrapper to protobuf
@@ -373,8 +375,6 @@ protected:
void _set_push_down(bool push_down) { _is_push_down = push_down; }
- std::string _format_status() const;
-
std::string _get_explain_state_string() const {
if (_enable_pipeline_exec) {
return _rf_state_atomic.load(std::memory_order_acquire) ==
RuntimeFilterState::READY
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 049905c5cc4..c40af83bd58 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -77,7 +77,7 @@ Status
MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
if (_t_data_stream_sink.__isset.conjuncts) {
RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
}
- return _acquire_runtime_filter();
+ return _acquire_runtime_filter(false);
}
bool
MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
@@ -129,10 +129,7 @@
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
parent->runtime_filter_descs(),
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
- _filter_dependency = std::make_shared<RuntimeFilterDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
- state->get_query_ctx());
-};
+}
Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
@@ -145,12 +142,30 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state,
_output_expr_contexts[i]));
}
+ _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
- init_runtime_filter_dependency(_filter_dependency.get());
+ init_runtime_filter_dependency(_filter_dependencies, p.operator_id(),
p.node_id(),
+ p.get_name() + "_FILTER_DEPENDENCY");
return Status::OK();
}
+Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
+ if (_closed) {
+ return Status::OK();
+ }
+
+ SCOPED_TIMER(_close_timer);
+ SCOPED_TIMER(exec_time_counter());
+ int64_t rf_time = 0;
+ for (auto& dep : _filter_dependencies) {
+ rf_time += dep->watcher_elapse_time();
+ }
+ COUNTER_SET(_wait_for_rf_timer, rf_time);
+
+ return Base::close(state);
+}
+
Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
vectorized::Block*
block, bool* eos) {
//auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 3af8c5507bd..fd1f6f2c033 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -104,16 +104,29 @@ public:
Status open(RuntimeState* state) override {
RETURN_IF_ERROR(Base::open(state));
- RETURN_IF_ERROR(_acquire_runtime_filter());
+ RETURN_IF_ERROR(_acquire_runtime_filter(true));
return Status::OK();
}
+ Status close(RuntimeState* state) override;
friend class MultiCastDataStreamerSourceOperatorX;
- RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); }
+ std::vector<Dependency*> filter_dependencies() override {
+ if (_filter_dependencies.empty()) {
+ return {};
+ }
+ std::vector<Dependency*> res;
+ res.resize(_filter_dependencies.size());
+ for (size_t i = 0; i < _filter_dependencies.size(); i++) {
+ res[i] = _filter_dependencies[i].get();
+ }
+ return res;
+ }
private:
vectorized::VExprContextSPtrs _output_expr_contexts;
- std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
+ std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
+
+ RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
};
class MultiCastDataStreamerSourceOperatorX final
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 2f21b1626a5..4218ac1308d 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -91,14 +91,6 @@ std::string ScanOperator::debug_string() const {
return; \
}
-template <typename Derived>
-ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase*
parent)
- : ScanLocalStateBase(state, parent) {
- _filter_dependency = std::make_shared<RuntimeFilterDependency>(
- parent->operator_id(), parent->node_id(), parent->get_name() +
"_FILTER_DEPENDENCY",
- state->get_query_ctx());
-}
-
template <typename Derived>
bool ScanLocalState<Derived>::ready_to_read() {
return !_scanner_ctx->empty_in_queue(0);
@@ -133,7 +125,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
}
// init profile for runtime filter
RuntimeFilterConsumer::_init_profile(profile());
- init_runtime_filter_dependency(_filter_dependency.get());
+ init_runtime_filter_dependency(_filter_dependencies, p.operator_id(),
p.node_id(),
+ p.get_name() + "_FILTER_DEPENDENCY");
// 1: running at not pipeline mode will init profile.
// 2: the scan node should create scanner at pipeline mode will init
profile.
@@ -156,7 +149,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
if (_opened) {
return Status::OK();
}
- RETURN_IF_ERROR(_acquire_runtime_filter());
+ RETURN_IF_ERROR(_acquire_runtime_filter(true));
RETURN_IF_ERROR(_process_conjuncts());
auto status = _eos ? Status::OK() : _prepare_scanners();
@@ -1412,7 +1405,11 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
return Status::OK();
}
COUNTER_UPDATE(exec_time_counter(),
_scan_dependency->watcher_elapse_time());
- COUNTER_UPDATE(exec_time_counter(),
_filter_dependency->watcher_elapse_time());
+ int64_t rf_time = 0;
+ for (auto& dep : _filter_dependencies) {
+ rf_time += dep->watcher_elapse_time();
+ }
+ COUNTER_UPDATE(exec_time_counter(), rf_time);
SCOPED_TIMER(_close_timer);
SCOPED_TIMER(exec_time_counter());
@@ -1421,7 +1418,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
}
std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->watcher_elapse_time());
- COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
+ COUNTER_SET(_wait_for_rf_timer, rf_time);
return PipelineXLocalState<>::close(state);
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index a22a00da529..e941f8ce969 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -134,7 +134,8 @@ class ScanOperatorX;
template <typename Derived>
class ScanLocalState : public ScanLocalStateBase {
ENABLE_FACTORY_CREATOR(ScanLocalState);
- ScanLocalState(RuntimeState* state, OperatorXBase* parent);
+ ScanLocalState(RuntimeState* state, OperatorXBase* parent)
+ : ScanLocalStateBase(state, parent) {}
~ScanLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
@@ -165,7 +166,17 @@ class ScanLocalState : public ScanLocalStateBase {
int64_t get_push_down_count() override;
- RuntimeFilterDependency* filterdependency() override { return
_filter_dependency.get(); };
+ std::vector<Dependency*> filter_dependencies() override {
+ if (_filter_dependencies.empty()) {
+ return {};
+ }
+ std::vector<Dependency*> res;
+ res.resize(_filter_dependencies.size());
+ for (size_t i = 0; i < _filter_dependencies.size(); i++) {
+ res[i] = _filter_dependencies[i].get();
+ }
+ return res;
+ }
std::vector<Dependency*> dependencies() const override { return
{_scan_dependency.get()}; }
@@ -364,7 +375,7 @@ protected:
std::mutex _block_lock;
- std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
+ std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
// ScanLocalState owns the ownership of scanner, scanner context only has
its weakptr
std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 016410b54bb..73f089f4c1b 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -89,20 +89,6 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask*
task) {
return ready ? nullptr : this;
}
-Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
- if (!_blocked_by_rf) {
- return nullptr;
- }
- std::unique_lock<std::mutex> lc(_task_lock);
- if (*_blocked_by_rf && !_is_cancelled()) {
- if (LIKELY(task)) {
- _add_block_task(task);
- }
- return this;
- }
- return nullptr;
-}
-
std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int
indentation_level) {
std::string RuntimeFilterDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "{}{}: id={}, block task = {}, ready={}, _filters = {},
_blocked_by_rf = {}",
- std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
- _ready, _filters.load(), _blocked_by_rf ?
_blocked_by_rf->load() : false);
+ fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+ Dependency::debug_string(indentation_level),
_runtime_filter->formatted_state());
return fmt::to_string(debug_string_buffer);
}
-bool RuntimeFilterTimer::has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto ready = _ready.load() || _is_cancelled();
+ if (!ready && task) {
+ _add_block_task(task);
+ task->_blocked_dep = this;
+ }
+ return ready ? nullptr : this;
}
void RuntimeFilterTimer::call_timeout() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_ready) {
- return;
- }
- _call_timeout = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
void RuntimeFilterTimer::call_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- if (_call_timeout) {
- return;
- }
- _call_ready = true;
- if (_parent) {
- _parent->sub_filters(_filter_id);
- }
- _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
- std::unique_lock<std::mutex> lc(_lock);
- DCHECK(!_call_timeout);
- if (!_call_ready) {
- _parent->sub_filters(_filter_id);
- }
+ _parent->set_ready();
}
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
- const auto filter_id = runtime_filter->filter_id();
- ;
- _filters++;
- _filter_ready_map[filter_id] = false;
- int64_t registration_time = runtime_filter->registration_time();
- int32 wait_time_ms = runtime_filter->wait_time_ms();
- auto filter_timer = std::make_shared<RuntimeFilterTimer>(
- filter_id, registration_time, wait_time_ms,
-
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
- runtime_filter->set_filter_timer(filter_timer);
-
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
+ while (!_stop) {
+ std::unique_lock<std::mutex> lk(cv_m);
-void RuntimeFilterDependency::sub_filters(int id) {
- std::vector<PipelineXTask*> local_block_task {};
- {
- std::lock_guard<std::mutex> lk(_task_lock);
- if (!_filter_ready_map[id]) {
- _filter_ready_map[id] = true;
- _filters--;
+ while (_que.empty() && !_stop) {
+ cv.wait_for(lk, std::chrono::seconds(3), [this] { return
!_que.empty() || _stop; });
+ }
+ if (_stop) {
+ break;
}
- if (_filters == 0) {
- _watcher.stop();
- {
- *_blocked_by_rf = false;
- local_block_task.swap(_blocked_task);
+ {
+ std::unique_lock<std::mutex> lc(_que_lock);
+ std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> new_que;
+ for (auto& it : _que) {
+ if (it.use_count() == 1) {
+ // `use_count == 1` means this runtime filter has been
released
+ } else if (it->_parent->is_blocked_by(nullptr)) {
+ // This means runtime filter is not ready, so we call
timeout or continue to poll this timer.
+ int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
+ if (ms_since_registration > it->wait_time_ms()) {
+ it->call_timeout();
+ } else {
+ new_que.push_back(std::move(it));
+ }
+ }
}
+ new_que.swap(_que);
}
+ std::this_thread::sleep_for(std::chrono::milliseconds(interval));
}
- for (auto* task : local_block_task) {
- task->wake_up();
- }
+ _shutdown = true;
}
void LocalExchangeSharedState::sub_running_sink_operators() {
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index fe95c1c4470..c7f37881cd2 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -205,79 +205,41 @@ public:
};
class RuntimeFilterDependency;
+struct RuntimeFilterTimerQueue;
class RuntimeFilterTimer {
public:
- RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t
wait_time_ms,
+ RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
std::shared_ptr<RuntimeFilterDependency> parent)
- : _filter_id(filter_id),
- _parent(std::move(parent)),
+ : _parent(std::move(parent)),
_registration_time(registration_time),
_wait_time_ms(wait_time_ms) {}
+ // Called by runtime filter producer.
void call_ready();
+ // Called by RuntimeFilterTimerQueue which is responsible for checking if
this rf is timeout.
void call_timeout();
- void call_has_ready();
-
- // When the use count is equal to 1, only the timer queue still holds
ownership,
- // so there is no need to take any action.
- void call_has_release() {};
-
- bool has_ready();
-
int64_t registration_time() const { return _registration_time; }
int32_t wait_time_ms() const { return _wait_time_ms; }
private:
- int _filter_id = -1;
- bool _call_ready {};
- bool _call_timeout {};
- std::shared_ptr<RuntimeFilterDependency> _parent;
+ friend struct RuntimeFilterTimerQueue;
+ std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
std::mutex _lock;
const int64_t _registration_time;
const int32_t _wait_time_ms;
- bool _is_ready = false;
};
struct RuntimeFilterTimerQueue {
constexpr static int64_t interval = 10;
void run() { _thread.detach(); }
- void start() {
- while (!_stop) {
- std::unique_lock<std::mutex> lk(cv_m);
-
- cv.wait(lk, [this] { return !_que.empty() || _stop; });
- if (_stop) {
- break;
- }
- {
- std::unique_lock<std::mutex> lc(_que_lock);
- std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>>
new_que;
- for (auto& it : _que) {
- if (it.use_count() == 1) {
- it->call_has_release();
- } else if (it->has_ready()) {
- it->call_has_ready();
- } else {
- int64_t ms_since_registration = MonotonicMillis() -
it->registration_time();
- if (ms_since_registration > it->wait_time_ms()) {
- it->call_timeout();
- } else {
- new_que.push_back(std::move(it));
- }
- }
- }
- new_que.swap(_que);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(interval));
- }
- _shutdown = true;
- }
+ void start();
void stop() {
_stop = true;
cv.notify_all();
+ wait_for_shutdown();
}
void wait_for_shutdown() const {
@@ -286,7 +248,7 @@ struct RuntimeFilterTimerQueue {
}
}
- ~RuntimeFilterTimerQueue() { wait_for_shutdown(); }
+ ~RuntimeFilterTimerQueue() = default;
RuntimeFilterTimerQueue() { _thread =
std::thread(&RuntimeFilterTimerQueue::start, this); }
void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>
filter) { push(filter); }
@@ -307,21 +269,15 @@ struct RuntimeFilterTimerQueue {
class RuntimeFilterDependency final : public Dependency {
public:
- RuntimeFilterDependency(int id, int node_id, std::string name,
QueryContext* query_ctx)
- : Dependency(id, node_id, name, query_ctx) {}
- Dependency* is_blocked_by(PipelineXTask* task) override;
- void add_filters(IRuntimeFilter* runtime_filter);
- void sub_filters(int id);
- void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
- _blocked_by_rf = blocked_by_rf;
- }
-
+ RuntimeFilterDependency(int id, int node_id, std::string name,
QueryContext* query_ctx,
+ IRuntimeFilter* runtime_filter)
+ : Dependency(id, node_id, name, query_ctx),
_runtime_filter(runtime_filter) {}
std::string debug_string(int indentation_level = 0) override;
-protected:
- std::atomic_int _filters;
- phmap::flat_hash_map<int, bool> _filter_ready_map;
- std::shared_ptr<std::atomic_bool> _blocked_by_rf;
+ Dependency* is_blocked_by(PipelineXTask* task) override;
+
+private:
+ const IRuntimeFilter* _runtime_filter = nullptr;
};
struct AggSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index c375efb924d..da20b9885a8 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -102,7 +102,7 @@ public:
// override in Scan
virtual Dependency* finishdependency() { return nullptr; }
// override in Scan MultiCastSink
- virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
+ virtual std::vector<Dependency*> filter_dependencies() { return {}; }
std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return
_query_statistics; }
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index da5da2f0477..9d5338e7f5e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -139,7 +139,11 @@ Status PipelineXTask::_extract_dependencies() {
_finish_dependencies.push_back(fin_dep);
}
}
- { _filter_dependency =
_state->get_local_state(_source->operator_id())->filterdependency(); }
+ {
+ const auto& deps =
_state->get_local_state(_source->operator_id())->filter_dependencies();
+ std::copy(deps.begin(), deps.end(),
+ std::inserter(_filter_dependencies,
_filter_dependencies.end()));
+ }
return Status::OK();
}
@@ -189,22 +193,7 @@ Status PipelineXTask::_open() {
_dry_run = _sink->should_dry_run(_state);
for (auto& o : _operators) {
auto* local_state = _state->get_local_state(o->operator_id());
- for (size_t i = 0; i < 2; i++) {
- auto st = local_state->open(_state);
- if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
- DCHECK(_filter_dependency);
- _blocked_dep = _filter_dependency->is_blocked_by(this);
- if (_blocked_dep) {
- set_state(PipelineTaskState::BLOCKED_FOR_RF);
- RETURN_IF_ERROR(st);
- } else if (i == 1) {
- return Status::InternalError("Unknown RF error, task was
blocked by RF twice");
- }
- } else {
- RETURN_IF_ERROR(st);
- break;
- }
- }
+ RETURN_IF_ERROR(local_state->open(_state));
}
RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
_opened = true;
@@ -234,15 +223,15 @@ Status PipelineXTask::execute(bool* eos) {
set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
return Status::OK();
}
+ if (_runtime_filter_blocked_dependency() != nullptr) {
+ set_state(PipelineTaskState::BLOCKED_FOR_RF);
+ return Status::OK();
+ }
// The status must be runnable
if (!_opened) {
{
SCOPED_RAW_TIMER(&time_spent);
- auto st = _open();
- if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
- return Status::OK();
- }
- RETURN_IF_ERROR(st);
+ RETURN_IF_ERROR(_open());
}
if (!source_can_read()) {
set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
@@ -396,7 +385,7 @@ std::string PipelineXTask::debug_string() {
if (_finished) {
return fmt::to_string(debug_string_buffer);
}
- fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n");
+
size_t i = 0;
for (; i < _read_dependencies.size(); i++) {
fmt::format_to(debug_string_buffer, "{}. {}\n", i,
@@ -409,10 +398,10 @@ std::string PipelineXTask::debug_string() {
_write_dependencies[j]->debug_string(i + 1));
}
- if (_filter_dependency) {
- fmt::format_to(debug_string_buffer, "Runtime Filter Dependency
Information: \n");
- fmt::format_to(debug_string_buffer, "{}. {}\n", i,
_filter_dependency->debug_string(1));
- i++;
+ fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency
Information: \n");
+ for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) {
+ fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+ _filter_dependencies[j]->debug_string(i + 1));
}
fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index c754af645ef..a89df75fc9b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -155,6 +155,7 @@ public:
}
private:
+ friend class RuntimeFilterDependency;
Dependency* _write_blocked_dependency() {
for (auto* op_dep : _write_dependencies) {
_blocked_dep = op_dep->is_blocked_by(this);
@@ -188,6 +189,17 @@ private:
return nullptr;
}
+ Dependency* _runtime_filter_blocked_dependency() {
+ for (auto* op_dep : _filter_dependencies) {
+ _blocked_dep = op_dep->is_blocked_by(this);
+ if (_blocked_dep != nullptr) {
+ _blocked_dep->start_watcher();
+ return _blocked_dep;
+ }
+ }
+ return nullptr;
+ }
+
Status _extract_dependencies();
void set_close_pipeline_time() override {}
void _init_profile() override;
@@ -202,7 +214,7 @@ private:
std::vector<Dependency*> _read_dependencies;
std::vector<Dependency*> _write_dependencies;
std::vector<Dependency*> _finish_dependencies;
- RuntimeFilterDependency* _filter_dependency;
+ std::vector<Dependency*> _filter_dependencies;
// All shared states of this pipeline task.
std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 480521f58d3..df1b166c5fa 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -34,6 +34,7 @@
#include "common/status.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_x/operator.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/exec_env.h"
#include "runtime/load_path_mgr.h"
#include "runtime/memory/mem_tracker_limiter.h"
@@ -543,4 +544,5 @@ Status RuntimeState::register_consumer_runtime_filter(const
doris::TRuntimeFilte
bool RuntimeState::is_nereids() const {
return _query_ctx->is_nereids();
}
+
} // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 79b82b94a11..e6ffce6c81d 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -50,6 +50,7 @@ namespace pipeline {
class PipelineXLocalStateBase;
class PipelineXSinkLocalStateBase;
class PipelineXFragmentContext;
+class PipelineXTask;
} // namespace pipeline
class DescriptorTbl;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 097df801615..2913fad3d8d 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -17,6 +17,8 @@
#include "vec/exec/runtime_filter_consumer.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
+
namespace doris::vectorized {
RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
@@ -75,36 +77,57 @@ bool
RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
}
void RuntimeFilterConsumer::init_runtime_filter_dependency(
- doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
- _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name) {
+ runtime_filter_dependencies.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
- _runtime_filter_dependency->add_filters(runtime_filter);
+ runtime_filter_dependencies[i] =
std::make_shared<pipeline::RuntimeFilterDependency>(
+ id, node_id, name, _state->get_query_ctx(), runtime_filter);
+ _runtime_filter_ctxs[i].runtime_filter_dependency =
runtime_filter_dependencies[i].get();
+ auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+ runtime_filter->registration_time(),
runtime_filter->wait_time_ms(),
+ runtime_filter_dependencies[i]);
+ runtime_filter->set_filter_timer(filter_timer);
+
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
}
}
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
SCOPED_TIMER(_acquire_runtime_filter_timer);
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
IRuntimeFilter* runtime_filter =
_runtime_filter_ctxs[i].runtime_filter;
- bool ready = runtime_filter->is_ready();
- if (!ready) {
- ready = runtime_filter->await();
- }
- if (ready && !_runtime_filter_ctxs[i].apply_mark) {
- RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs,
vexprs, false));
- _runtime_filter_ctxs[i].apply_mark = true;
- } else if (runtime_filter->current_state() ==
RuntimeFilterState::NOT_READY &&
- !_runtime_filter_ctxs[i].apply_mark) {
- *_blocked_by_rf = true;
- } else if (!_runtime_filter_ctxs[i].apply_mark) {
- DCHECK(runtime_filter->current_state() !=
RuntimeFilterState::NOT_READY);
- _is_all_rf_applied = false;
+ if (pipeline_x) {
+ runtime_filter->update_state();
+ if (runtime_filter->is_ready() &&
!_runtime_filter_ctxs[i].apply_mark) {
+ // Runtime filter has been applied in open phase.
+
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
+ _runtime_filter_ctxs[i].apply_mark = true;
+ } else if (!_runtime_filter_ctxs[i].apply_mark) {
+ // Runtime filter is timeout.
+ _is_all_rf_applied = false;
+ }
+ } else {
+ bool ready = runtime_filter->is_ready();
+ if (!ready) {
+ ready = runtime_filter->await();
+ }
+ if (ready && !_runtime_filter_ctxs[i].apply_mark) {
+
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
+ _runtime_filter_ctxs[i].apply_mark = true;
+ } else if (runtime_filter->current_state() ==
RuntimeFilterState::NOT_READY &&
+ !_runtime_filter_ctxs[i].apply_mark) {
+ *_blocked_by_rf = true;
+ } else if (!_runtime_filter_ctxs[i].apply_mark) {
+ DCHECK(runtime_filter->current_state() !=
RuntimeFilterState::NOT_READY);
+ _is_all_rf_applied = false;
+ }
}
}
RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
- if (*_blocked_by_rf) {
+ if (!pipeline_x && *_blocked_by_rf) {
return Status::WaitForRf("Runtime filters are neither not ready nor
timeout");
}
diff --git a/be/src/vec/exec/runtime_filter_consumer.h
b/be/src/vec/exec/runtime_filter_consumer.h
index 86609624be6..61fdf13cd8b 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -38,13 +38,16 @@ public:
bool runtime_filters_are_ready_or_timeout();
- void
init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+ void init_runtime_filter_dependency(
+ std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+ runtime_filter_dependencies,
+ const int id, const int node_id, const std::string& name);
protected:
// Register and get all runtime filters at Init phase.
Status _register_runtime_filter(bool need_local_merge);
// Get all arrived runtime filters at Open phase.
- Status _acquire_runtime_filter();
+ Status _acquire_runtime_filter(bool pipeline_x);
// Append late-arrival runtime filters to the vconjunct_ctx.
Status _append_rf_into_conjuncts(const
std::vector<vectorized::VRuntimeFilterPtr>& vexprs);
@@ -58,6 +61,7 @@ protected:
// set to true if this runtime filter is already applied to
vconjunct_ctx_ptr
bool apply_mark = false;
IRuntimeFilter* runtime_filter = nullptr;
+ pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
};
std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 5f8e6d8aa4c..282d6f1182e 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -187,7 +187,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
}
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));
- RETURN_IF_ERROR(_acquire_runtime_filter());
+ RETURN_IF_ERROR(_acquire_runtime_filter(false));
RETURN_IF_ERROR(_process_conjuncts());
if (_is_pipeline_scan) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]