This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a8232c67f91 [pipelineX](runtime filter) Fix task timeout caused by 
runtime filter (#33332) (#33369)
a8232c67f91 is described below

commit a8232c67f91bf66200227fa18a21007f38235a01
Author: Gabriel <[email protected]>
AuthorDate: Mon Apr 8 16:30:32 2024 +0800

    [pipelineX](runtime filter) Fix task timeout caused by runtime filter 
(#33332) (#33369)
---
 be/src/exprs/runtime_filter.cpp                    |  23 ++++-
 be/src/exprs/runtime_filter.h                      |   4 +-
 .../exec/multi_cast_data_stream_source.cpp         |  27 +++--
 .../pipeline/exec/multi_cast_data_stream_source.h  |  19 +++-
 be/src/pipeline/exec/scan_operator.cpp             |  21 ++--
 be/src/pipeline/exec/scan_operator.h               |  17 +++-
 be/src/pipeline/pipeline_x/dependency.cpp          | 112 +++++++--------------
 be/src/pipeline/pipeline_x/dependency.h            |  78 ++++----------
 be/src/pipeline/pipeline_x/operator.h              |   2 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  43 +++-----
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |  14 ++-
 be/src/runtime/runtime_state.cpp                   |   2 +
 be/src/runtime/runtime_state.h                     |   1 +
 be/src/vec/exec/runtime_filter_consumer.cpp        |  59 +++++++----
 be/src/vec/exec/runtime_filter_consumer.h          |   8 +-
 be/src/vec/exec/scan/vscan_node.cpp                |   2 +-
 16 files changed, 218 insertions(+), 214 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f680db5adee..1c5c3f7d4a2 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1095,7 +1095,7 @@ Status 
IRuntimeFilter::get_push_expr_ctxs(std::list<vectorized::VExprContextSPtr
         _set_push_down(!is_late_arrival);
         RETURN_IF_ERROR(_wrapper->get_push_exprs(probe_ctxs, push_exprs, 
_probe_expr));
     }
-    _profile->add_info_string("Info", _format_status());
+    _profile->add_info_string("Info", formatted_state());
     // The runtime filter is pushed down, adding filtering information.
     auto* expr_filtered_rows_counter = ADD_COUNTER(_profile, 
"expr_filtered_rows", TUnit::UNIT);
     auto* expr_input_rows_counter = ADD_COUNTER(_profile, "expr_input_rows", 
TUnit::UNIT);
@@ -1148,6 +1148,23 @@ bool IRuntimeFilter::await() {
     return true;
 }
 
+void IRuntimeFilter::update_state() {
+    DCHECK(is_consumer());
+    auto execution_timeout = _state->execution_timeout * 1000;
+    auto runtime_filter_wait_time_ms = _state->runtime_filter_wait_time_ms;
+    // bitmap filter is precise filter and only filter once, so it must be 
applied.
+    int64_t wait_times_ms = _wrapper->get_real_type() == 
RuntimeFilterType::BITMAP_FILTER
+                                    ? execution_timeout
+                                    : runtime_filter_wait_time_ms;
+    auto expected = _rf_state_atomic.load(std::memory_order_acquire);
+    DCHECK(_enable_pipeline_exec);
+    // In pipelineX, runtime filters will be ready or timeout before open 
phase.
+    if (expected == RuntimeFilterState::NOT_READY) {
+        DCHECK(MonotonicMillis() - registration_time_ >= wait_times_ms);
+        _rf_state_atomic = RuntimeFilterState::TIME_OUT;
+    }
+}
+
 // NOTE: Wait infinitely will not make scan task wait really forever.
 // Because BlockTaskSchedule will make it run when query is timedout.
 bool IRuntimeFilter::wait_infinitely() const {
@@ -1236,7 +1253,7 @@ void IRuntimeFilter::set_ignored(const std::string& msg) {
     _wrapper->_ignored_msg = msg;
 }
 
-std::string IRuntimeFilter::_format_status() const {
+std::string IRuntimeFilter::formatted_state() const {
     return fmt::format(
             "[IsPushDown = {}, RuntimeFilterState = {}, IgnoredMsg = {}, 
HasRemoteTarget = {}, "
             "HasLocalTarget = {}]",
@@ -1411,7 +1428,7 @@ void IRuntimeFilter::init_profile(RuntimeProfile* 
parent_profile) {
     } else {
         _profile_init = true;
         parent_profile->add_child(_profile.get(), true, nullptr);
-        _profile->add_info_string("Info", _format_status());
+        _profile->add_info_string("Info", formatted_state());
     }
 }
 
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 620d61ae564..ff825523ae1 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -263,6 +263,7 @@ public:
     // This function will wait at most 
config::runtime_filter_shuffle_wait_time_ms
     // if return true , filter is ready to use
     bool await();
+    void update_state();
     // this function will be called if a runtime filter sent by rpc
     // it will notify all wait threads
     void signal();
@@ -355,6 +356,7 @@ public:
     int64_t registration_time() const { return registration_time_; }
 
     void set_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer>);
+    std::string formatted_state() const;
 
 protected:
     // serialize _wrapper to protobuf
@@ -373,8 +375,6 @@ protected:
 
     void _set_push_down(bool push_down) { _is_push_down = push_down; }
 
-    std::string _format_status() const;
-
     std::string _get_explain_state_string() const {
         if (_enable_pipeline_exec) {
             return _rf_state_atomic.load(std::memory_order_acquire) == 
RuntimeFilterState::READY
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 049905c5cc4..c40af83bd58 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -77,7 +77,7 @@ Status 
MultiCastDataStreamerSourceOperator::open(doris::RuntimeState* state) {
     if (_t_data_stream_sink.__isset.conjuncts) {
         RETURN_IF_ERROR(vectorized::VExpr::open(_conjuncts, state));
     }
-    return _acquire_runtime_filter();
+    return _acquire_runtime_filter(false);
 }
 
 bool 
MultiCastDataStreamerSourceOperator::runtime_filters_are_ready_or_timeout() {
@@ -129,10 +129,7 @@ 
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(Runtime
           
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
                                             parent->runtime_filter_descs(),
                                             
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
-    _filter_dependency = std::make_shared<RuntimeFilterDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
-            state->get_query_ctx());
-};
+}
 
 Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
@@ -145,12 +142,30 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     for (size_t i = 0; i < p._output_expr_contexts.size(); i++) {
         RETURN_IF_ERROR(p._output_expr_contexts[i]->clone(state, 
_output_expr_contexts[i]));
     }
+    _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
-    init_runtime_filter_dependency(_filter_dependency.get());
+    init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), 
p.node_id(),
+                                   p.get_name() + "_FILTER_DEPENDENCY");
     return Status::OK();
 }
 
+Status MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    SCOPED_TIMER(_close_timer);
+    SCOPED_TIMER(exec_time_counter());
+    int64_t rf_time = 0;
+    for (auto& dep : _filter_dependencies) {
+        rf_time += dep->watcher_elapse_time();
+    }
+    COUNTER_SET(_wait_for_rf_timer, rf_time);
+
+    return Base::close(state);
+}
+
 Status MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
                                                        vectorized::Block* 
block, bool* eos) {
     //auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 3af8c5507bd..fd1f6f2c033 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -104,16 +104,29 @@ public:
 
     Status open(RuntimeState* state) override {
         RETURN_IF_ERROR(Base::open(state));
-        RETURN_IF_ERROR(_acquire_runtime_filter());
+        RETURN_IF_ERROR(_acquire_runtime_filter(true));
         return Status::OK();
     }
+    Status close(RuntimeState* state) override;
     friend class MultiCastDataStreamerSourceOperatorX;
 
-    RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); }
+    std::vector<Dependency*> filter_dependencies() override {
+        if (_filter_dependencies.empty()) {
+            return {};
+        }
+        std::vector<Dependency*> res;
+        res.resize(_filter_dependencies.size());
+        for (size_t i = 0; i < _filter_dependencies.size(); i++) {
+            res[i] = _filter_dependencies[i].get();
+        }
+        return res;
+    }
 
 private:
     vectorized::VExprContextSPtrs _output_expr_contexts;
-    std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
+    std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
+
+    RuntimeProfile::Counter* _wait_for_rf_timer = nullptr;
 };
 
 class MultiCastDataStreamerSourceOperatorX final
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 2f21b1626a5..4218ac1308d 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -91,14 +91,6 @@ std::string ScanOperator::debug_string() const {
         return;                                                     \
     }
 
-template <typename Derived>
-ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* 
parent)
-        : ScanLocalStateBase(state, parent) {
-    _filter_dependency = std::make_shared<RuntimeFilterDependency>(
-            parent->operator_id(), parent->node_id(), parent->get_name() + 
"_FILTER_DEPENDENCY",
-            state->get_query_ctx());
-}
-
 template <typename Derived>
 bool ScanLocalState<Derived>::ready_to_read() {
     return !_scanner_ctx->empty_in_queue(0);
@@ -133,7 +125,8 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     }
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
-    init_runtime_filter_dependency(_filter_dependency.get());
+    init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), 
p.node_id(),
+                                   p.get_name() + "_FILTER_DEPENDENCY");
 
     // 1: running at not pipeline mode will init profile.
     // 2: the scan node should create scanner at pipeline mode will init 
profile.
@@ -156,7 +149,7 @@ Status ScanLocalState<Derived>::open(RuntimeState* state) {
     if (_opened) {
         return Status::OK();
     }
-    RETURN_IF_ERROR(_acquire_runtime_filter());
+    RETURN_IF_ERROR(_acquire_runtime_filter(true));
     RETURN_IF_ERROR(_process_conjuncts());
 
     auto status = _eos ? Status::OK() : _prepare_scanners();
@@ -1412,7 +1405,11 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
         return Status::OK();
     }
     COUNTER_UPDATE(exec_time_counter(), 
_scan_dependency->watcher_elapse_time());
-    COUNTER_UPDATE(exec_time_counter(), 
_filter_dependency->watcher_elapse_time());
+    int64_t rf_time = 0;
+    for (auto& dep : _filter_dependencies) {
+        rf_time += dep->watcher_elapse_time();
+    }
+    COUNTER_UPDATE(exec_time_counter(), rf_time);
     SCOPED_TIMER(_close_timer);
 
     SCOPED_TIMER(exec_time_counter());
@@ -1421,7 +1418,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* 
state) {
     }
     std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
     COUNTER_SET(_wait_for_dependency_timer, 
_scan_dependency->watcher_elapse_time());
-    COUNTER_SET(_wait_for_rf_timer, _filter_dependency->watcher_elapse_time());
+    COUNTER_SET(_wait_for_rf_timer, rf_time);
 
     return PipelineXLocalState<>::close(state);
 }
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index a22a00da529..e941f8ce969 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -134,7 +134,8 @@ class ScanOperatorX;
 template <typename Derived>
 class ScanLocalState : public ScanLocalStateBase {
     ENABLE_FACTORY_CREATOR(ScanLocalState);
-    ScanLocalState(RuntimeState* state, OperatorXBase* parent);
+    ScanLocalState(RuntimeState* state, OperatorXBase* parent)
+            : ScanLocalStateBase(state, parent) {}
     ~ScanLocalState() override = default;
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
@@ -165,7 +166,17 @@ class ScanLocalState : public ScanLocalStateBase {
 
     int64_t get_push_down_count() override;
 
-    RuntimeFilterDependency* filterdependency() override { return 
_filter_dependency.get(); };
+    std::vector<Dependency*> filter_dependencies() override {
+        if (_filter_dependencies.empty()) {
+            return {};
+        }
+        std::vector<Dependency*> res;
+        res.resize(_filter_dependencies.size());
+        for (size_t i = 0; i < _filter_dependencies.size(); i++) {
+            res[i] = _filter_dependencies[i].get();
+        }
+        return res;
+    }
 
     std::vector<Dependency*> dependencies() const override { return 
{_scan_dependency.get()}; }
 
@@ -364,7 +375,7 @@ protected:
 
     std::mutex _block_lock;
 
-    std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
+    std::vector<std::shared_ptr<RuntimeFilterDependency>> _filter_dependencies;
 
     // ScanLocalState owns the ownership of scanner, scanner context only has 
its weakptr
     std::list<std::shared_ptr<vectorized::ScannerDelegate>> _scanners;
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 016410b54bb..73f089f4c1b 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -89,20 +89,6 @@ Dependency* FinishDependency::is_blocked_by(PipelineXTask* 
task) {
     return ready ? nullptr : this;
 }
 
-Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
-    if (!_blocked_by_rf) {
-        return nullptr;
-    }
-    std::unique_lock<std::mutex> lc(_task_lock);
-    if (*_blocked_by_rf && !_is_cancelled()) {
-        if (LIKELY(task)) {
-            _add_block_task(task);
-        }
-        return this;
-    }
-    return nullptr;
-}
-
 std::string Dependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int 
indentation_level) {
 
 std::string RuntimeFilterDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block task = {}, ready={}, _filters = {}, 
_blocked_by_rf = {}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
-                   _ready, _filters.load(), _blocked_by_rf ? 
_blocked_by_rf->load() : false);
+    fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+                   Dependency::debug_string(indentation_level), 
_runtime_filter->formatted_state());
     return fmt::to_string(debug_string_buffer);
 }
 
-bool RuntimeFilterTimer::has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready = _ready.load() || _is_cancelled();
+    if (!ready && task) {
+        _add_block_task(task);
+        task->_blocked_dep = this;
+    }
+    return ready ? nullptr : this;
 }
 
 void RuntimeFilterTimer::call_timeout() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_ready) {
-        return;
-    }
-    _call_timeout = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
 void RuntimeFilterTimer::call_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_timeout) {
-        return;
-    }
-    _call_ready = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
-    _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    DCHECK(!_call_timeout);
-    if (!_call_ready) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
-    const auto filter_id = runtime_filter->filter_id();
-    ;
-    _filters++;
-    _filter_ready_map[filter_id] = false;
-    int64_t registration_time = runtime_filter->registration_time();
-    int32 wait_time_ms = runtime_filter->wait_time_ms();
-    auto filter_timer = std::make_shared<RuntimeFilterTimer>(
-            filter_id, registration_time, wait_time_ms,
-            
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
-    runtime_filter->set_filter_timer(filter_timer);
-    
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {
+    while (!_stop) {
+        std::unique_lock<std::mutex> lk(cv_m);
 
-void RuntimeFilterDependency::sub_filters(int id) {
-    std::vector<PipelineXTask*> local_block_task {};
-    {
-        std::lock_guard<std::mutex> lk(_task_lock);
-        if (!_filter_ready_map[id]) {
-            _filter_ready_map[id] = true;
-            _filters--;
+        while (_que.empty() && !_stop) {
+            cv.wait_for(lk, std::chrono::seconds(3), [this] { return 
!_que.empty() || _stop; });
+        }
+        if (_stop) {
+            break;
         }
-        if (_filters == 0) {
-            _watcher.stop();
-            {
-                *_blocked_by_rf = false;
-                local_block_task.swap(_blocked_task);
+        {
+            std::unique_lock<std::mutex> lc(_que_lock);
+            std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> new_que;
+            for (auto& it : _que) {
+                if (it.use_count() == 1) {
+                    // `use_count == 1` means this runtime filter has been 
released
+                } else if (it->_parent->is_blocked_by(nullptr)) {
+                    // This means runtime filter is not ready, so we call 
timeout or continue to poll this timer.
+                    int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
+                    if (ms_since_registration > it->wait_time_ms()) {
+                        it->call_timeout();
+                    } else {
+                        new_que.push_back(std::move(it));
+                    }
+                }
             }
+            new_que.swap(_que);
         }
+        std::this_thread::sleep_for(std::chrono::milliseconds(interval));
     }
-    for (auto* task : local_block_task) {
-        task->wake_up();
-    }
+    _shutdown = true;
 }
 
 void LocalExchangeSharedState::sub_running_sink_operators() {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index fe95c1c4470..c7f37881cd2 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -205,79 +205,41 @@ public:
 };
 
 class RuntimeFilterDependency;
+struct RuntimeFilterTimerQueue;
 class RuntimeFilterTimer {
 public:
-    RuntimeFilterTimer(int filter_id, int64_t registration_time, int32_t 
wait_time_ms,
+    RuntimeFilterTimer(int64_t registration_time, int32_t wait_time_ms,
                        std::shared_ptr<RuntimeFilterDependency> parent)
-            : _filter_id(filter_id),
-              _parent(std::move(parent)),
+            : _parent(std::move(parent)),
               _registration_time(registration_time),
               _wait_time_ms(wait_time_ms) {}
 
+    // Called by runtime filter producer.
     void call_ready();
 
+    // Called by RuntimeFilterTimerQueue which is responsible for checking if 
this rf is timeout.
     void call_timeout();
 
-    void call_has_ready();
-
-    // When the use count is equal to 1, only the timer queue still holds 
ownership,
-    // so there is no need to take any action.
-    void call_has_release() {};
-
-    bool has_ready();
-
     int64_t registration_time() const { return _registration_time; }
     int32_t wait_time_ms() const { return _wait_time_ms; }
 
 private:
-    int _filter_id = -1;
-    bool _call_ready {};
-    bool _call_timeout {};
-    std::shared_ptr<RuntimeFilterDependency> _parent;
+    friend struct RuntimeFilterTimerQueue;
+    std::shared_ptr<RuntimeFilterDependency> _parent = nullptr;
     std::mutex _lock;
     const int64_t _registration_time;
     const int32_t _wait_time_ms;
-    bool _is_ready = false;
 };
 
 struct RuntimeFilterTimerQueue {
     constexpr static int64_t interval = 10;
     void run() { _thread.detach(); }
-    void start() {
-        while (!_stop) {
-            std::unique_lock<std::mutex> lk(cv_m);
-
-            cv.wait(lk, [this] { return !_que.empty() || _stop; });
-            if (_stop) {
-                break;
-            }
-            {
-                std::unique_lock<std::mutex> lc(_que_lock);
-                std::list<std::shared_ptr<pipeline::RuntimeFilterTimer>> 
new_que;
-                for (auto& it : _que) {
-                    if (it.use_count() == 1) {
-                        it->call_has_release();
-                    } else if (it->has_ready()) {
-                        it->call_has_ready();
-                    } else {
-                        int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
-                        if (ms_since_registration > it->wait_time_ms()) {
-                            it->call_timeout();
-                        } else {
-                            new_que.push_back(std::move(it));
-                        }
-                    }
-                }
-                new_que.swap(_que);
-            }
-            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
-        }
-        _shutdown = true;
-    }
+    void start();
 
     void stop() {
         _stop = true;
         cv.notify_all();
+        wait_for_shutdown();
     }
 
     void wait_for_shutdown() const {
@@ -286,7 +248,7 @@ struct RuntimeFilterTimerQueue {
         }
     }
 
-    ~RuntimeFilterTimerQueue() { wait_for_shutdown(); }
+    ~RuntimeFilterTimerQueue() = default;
     RuntimeFilterTimerQueue() { _thread = 
std::thread(&RuntimeFilterTimerQueue::start, this); }
     void push_filter_timer(std::shared_ptr<pipeline::RuntimeFilterTimer> 
filter) { push(filter); }
 
@@ -307,21 +269,15 @@ struct RuntimeFilterTimerQueue {
 
 class RuntimeFilterDependency final : public Dependency {
 public:
-    RuntimeFilterDependency(int id, int node_id, std::string name, 
QueryContext* query_ctx)
-            : Dependency(id, node_id, name, query_ctx) {}
-    Dependency* is_blocked_by(PipelineXTask* task) override;
-    void add_filters(IRuntimeFilter* runtime_filter);
-    void sub_filters(int id);
-    void set_blocked_by_rf(std::shared_ptr<std::atomic_bool> blocked_by_rf) {
-        _blocked_by_rf = blocked_by_rf;
-    }
-
+    RuntimeFilterDependency(int id, int node_id, std::string name, 
QueryContext* query_ctx,
+                            IRuntimeFilter* runtime_filter)
+            : Dependency(id, node_id, name, query_ctx), 
_runtime_filter(runtime_filter) {}
     std::string debug_string(int indentation_level = 0) override;
 
-protected:
-    std::atomic_int _filters;
-    phmap::flat_hash_map<int, bool> _filter_ready_map;
-    std::shared_ptr<std::atomic_bool> _blocked_by_rf;
+    Dependency* is_blocked_by(PipelineXTask* task) override;
+
+private:
+    const IRuntimeFilter* _runtime_filter = nullptr;
 };
 
 struct AggSharedState : public BasicSharedState {
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index c375efb924d..da20b9885a8 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -102,7 +102,7 @@ public:
     // override in Scan
     virtual Dependency* finishdependency() { return nullptr; }
     //  override in Scan  MultiCastSink
-    virtual RuntimeFilterDependency* filterdependency() { return nullptr; }
+    virtual std::vector<Dependency*> filter_dependencies() { return {}; }
 
     std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return 
_query_statistics; }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index da5da2f0477..9d5338e7f5e 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -139,7 +139,11 @@ Status PipelineXTask::_extract_dependencies() {
             _finish_dependencies.push_back(fin_dep);
         }
     }
-    { _filter_dependency = 
_state->get_local_state(_source->operator_id())->filterdependency(); }
+    {
+        const auto& deps = 
_state->get_local_state(_source->operator_id())->filter_dependencies();
+        std::copy(deps.begin(), deps.end(),
+                  std::inserter(_filter_dependencies, 
_filter_dependencies.end()));
+    }
     return Status::OK();
 }
 
@@ -189,22 +193,7 @@ Status PipelineXTask::_open() {
     _dry_run = _sink->should_dry_run(_state);
     for (auto& o : _operators) {
         auto* local_state = _state->get_local_state(o->operator_id());
-        for (size_t i = 0; i < 2; i++) {
-            auto st = local_state->open(_state);
-            if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
-                DCHECK(_filter_dependency);
-                _blocked_dep = _filter_dependency->is_blocked_by(this);
-                if (_blocked_dep) {
-                    set_state(PipelineTaskState::BLOCKED_FOR_RF);
-                    RETURN_IF_ERROR(st);
-                } else if (i == 1) {
-                    return Status::InternalError("Unknown RF error, task was 
blocked by RF twice");
-                }
-            } else {
-                RETURN_IF_ERROR(st);
-                break;
-            }
-        }
+        RETURN_IF_ERROR(local_state->open(_state));
     }
     RETURN_IF_ERROR(_state->get_sink_local_state()->open(_state));
     _opened = true;
@@ -234,15 +223,15 @@ Status PipelineXTask::execute(bool* eos) {
         set_state(PipelineTaskState::BLOCKED_FOR_DEPENDENCY);
         return Status::OK();
     }
+    if (_runtime_filter_blocked_dependency() != nullptr) {
+        set_state(PipelineTaskState::BLOCKED_FOR_RF);
+        return Status::OK();
+    }
     // The status must be runnable
     if (!_opened) {
         {
             SCOPED_RAW_TIMER(&time_spent);
-            auto st = _open();
-            if (st.is<ErrorCode::PIP_WAIT_FOR_RF>()) {
-                return Status::OK();
-            }
-            RETURN_IF_ERROR(st);
+            RETURN_IF_ERROR(_open());
         }
         if (!source_can_read()) {
             set_state(PipelineTaskState::BLOCKED_FOR_SOURCE);
@@ -396,7 +385,7 @@ std::string PipelineXTask::debug_string() {
     if (_finished) {
         return fmt::to_string(debug_string_buffer);
     }
-    fmt::format_to(debug_string_buffer, "\nRead Dependency Information: \n");
+
     size_t i = 0;
     for (; i < _read_dependencies.size(); i++) {
         fmt::format_to(debug_string_buffer, "{}. {}\n", i,
@@ -409,10 +398,10 @@ std::string PipelineXTask::debug_string() {
                        _write_dependencies[j]->debug_string(i + 1));
     }
 
-    if (_filter_dependency) {
-        fmt::format_to(debug_string_buffer, "Runtime Filter Dependency 
Information: \n");
-        fmt::format_to(debug_string_buffer, "{}. {}\n", i, 
_filter_dependency->debug_string(1));
-        i++;
+    fmt::format_to(debug_string_buffer, "\nRuntime Filter Dependency 
Information: \n");
+    for (size_t j = 0; j < _filter_dependencies.size(); j++, i++) {
+        fmt::format_to(debug_string_buffer, "{}. {}\n", i,
+                       _filter_dependencies[j]->debug_string(i + 1));
     }
 
     fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n");
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index c754af645ef..a89df75fc9b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -155,6 +155,7 @@ public:
     }
 
 private:
+    friend class RuntimeFilterDependency;
     Dependency* _write_blocked_dependency() {
         for (auto* op_dep : _write_dependencies) {
             _blocked_dep = op_dep->is_blocked_by(this);
@@ -188,6 +189,17 @@ private:
         return nullptr;
     }
 
+    Dependency* _runtime_filter_blocked_dependency() {
+        for (auto* op_dep : _filter_dependencies) {
+            _blocked_dep = op_dep->is_blocked_by(this);
+            if (_blocked_dep != nullptr) {
+                _blocked_dep->start_watcher();
+                return _blocked_dep;
+            }
+        }
+        return nullptr;
+    }
+
     Status _extract_dependencies();
     void set_close_pipeline_time() override {}
     void _init_profile() override;
@@ -202,7 +214,7 @@ private:
     std::vector<Dependency*> _read_dependencies;
     std::vector<Dependency*> _write_dependencies;
     std::vector<Dependency*> _finish_dependencies;
-    RuntimeFilterDependency* _filter_dependency;
+    std::vector<Dependency*> _filter_dependencies;
 
     // All shared states of this pipeline task.
     std::map<int, std::shared_ptr<BasicSharedState>> _op_shared_states;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 480521f58d3..df1b166c5fa 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -34,6 +34,7 @@
 #include "common/status.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/pipeline_x/operator.h"
+#include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/load_path_mgr.h"
 #include "runtime/memory/mem_tracker_limiter.h"
@@ -543,4 +544,5 @@ Status RuntimeState::register_consumer_runtime_filter(const 
doris::TRuntimeFilte
 bool RuntimeState::is_nereids() const {
     return _query_ctx->is_nereids();
 }
+
 } // end namespace doris
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 79b82b94a11..e6ffce6c81d 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -50,6 +50,7 @@ namespace pipeline {
 class PipelineXLocalStateBase;
 class PipelineXSinkLocalStateBase;
 class PipelineXFragmentContext;
+class PipelineXTask;
 } // namespace pipeline
 
 class DescriptorTbl;
diff --git a/be/src/vec/exec/runtime_filter_consumer.cpp 
b/be/src/vec/exec/runtime_filter_consumer.cpp
index 097df801615..2913fad3d8d 100644
--- a/be/src/vec/exec/runtime_filter_consumer.cpp
+++ b/be/src/vec/exec/runtime_filter_consumer.cpp
@@ -17,6 +17,8 @@
 
 #include "vec/exec/runtime_filter_consumer.h"
 
+#include "pipeline/pipeline_x/pipeline_x_task.h"
+
 namespace doris::vectorized {
 
 RuntimeFilterConsumer::RuntimeFilterConsumer(const int32_t filter_id,
@@ -75,36 +77,57 @@ bool 
RuntimeFilterConsumer::runtime_filters_are_ready_or_timeout() {
 }
 
 void RuntimeFilterConsumer::init_runtime_filter_dependency(
-        doris::pipeline::RuntimeFilterDependency* _runtime_filter_dependency) {
-    _runtime_filter_dependency->set_blocked_by_rf(_blocked_by_rf);
+        std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                runtime_filter_dependencies,
+        const int id, const int node_id, const std::string& name) {
+    runtime_filter_dependencies.resize(_runtime_filter_descs.size());
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
-        _runtime_filter_dependency->add_filters(runtime_filter);
+        runtime_filter_dependencies[i] = 
std::make_shared<pipeline::RuntimeFilterDependency>(
+                id, node_id, name, _state->get_query_ctx(), runtime_filter);
+        _runtime_filter_ctxs[i].runtime_filter_dependency = 
runtime_filter_dependencies[i].get();
+        auto filter_timer = std::make_shared<pipeline::RuntimeFilterTimer>(
+                runtime_filter->registration_time(), 
runtime_filter->wait_time_ms(),
+                runtime_filter_dependencies[i]);
+        runtime_filter->set_filter_timer(filter_timer);
+        
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
     }
 }
 
-Status RuntimeFilterConsumer::_acquire_runtime_filter() {
+Status RuntimeFilterConsumer::_acquire_runtime_filter(bool pipeline_x) {
     SCOPED_TIMER(_acquire_runtime_filter_timer);
     std::vector<vectorized::VRuntimeFilterPtr> vexprs;
     for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
         IRuntimeFilter* runtime_filter = 
_runtime_filter_ctxs[i].runtime_filter;
-        bool ready = runtime_filter->is_ready();
-        if (!ready) {
-            ready = runtime_filter->await();
-        }
-        if (ready && !_runtime_filter_ctxs[i].apply_mark) {
-            RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, 
vexprs, false));
-            _runtime_filter_ctxs[i].apply_mark = true;
-        } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
-                   !_runtime_filter_ctxs[i].apply_mark) {
-            *_blocked_by_rf = true;
-        } else if (!_runtime_filter_ctxs[i].apply_mark) {
-            DCHECK(runtime_filter->current_state() != 
RuntimeFilterState::NOT_READY);
-            _is_all_rf_applied = false;
+        if (pipeline_x) {
+            runtime_filter->update_state();
+            if (runtime_filter->is_ready() && 
!_runtime_filter_ctxs[i].apply_mark) {
+                // Runtime filter has been applied in open phase.
+                
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
+                _runtime_filter_ctxs[i].apply_mark = true;
+            } else if (!_runtime_filter_ctxs[i].apply_mark) {
+                // Runtime filter is timeout.
+                _is_all_rf_applied = false;
+            }
+        } else {
+            bool ready = runtime_filter->is_ready();
+            if (!ready) {
+                ready = runtime_filter->await();
+            }
+            if (ready && !_runtime_filter_ctxs[i].apply_mark) {
+                
RETURN_IF_ERROR(runtime_filter->get_push_expr_ctxs(_probe_ctxs, vexprs, false));
+                _runtime_filter_ctxs[i].apply_mark = true;
+            } else if (runtime_filter->current_state() == 
RuntimeFilterState::NOT_READY &&
+                       !_runtime_filter_ctxs[i].apply_mark) {
+                *_blocked_by_rf = true;
+            } else if (!_runtime_filter_ctxs[i].apply_mark) {
+                DCHECK(runtime_filter->current_state() != 
RuntimeFilterState::NOT_READY);
+                _is_all_rf_applied = false;
+            }
         }
     }
     RETURN_IF_ERROR(_append_rf_into_conjuncts(vexprs));
-    if (*_blocked_by_rf) {
+    if (!pipeline_x && *_blocked_by_rf) {
         return Status::WaitForRf("Runtime filters are neither not ready nor 
timeout");
     }
 
diff --git a/be/src/vec/exec/runtime_filter_consumer.h 
b/be/src/vec/exec/runtime_filter_consumer.h
index 86609624be6..61fdf13cd8b 100644
--- a/be/src/vec/exec/runtime_filter_consumer.h
+++ b/be/src/vec/exec/runtime_filter_consumer.h
@@ -38,13 +38,16 @@ public:
 
     bool runtime_filters_are_ready_or_timeout();
 
-    void 
init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+    void init_runtime_filter_dependency(
+            std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                    runtime_filter_dependencies,
+            const int id, const int node_id, const std::string& name);
 
 protected:
     // Register and get all runtime filters at Init phase.
     Status _register_runtime_filter(bool need_local_merge);
     // Get all arrived runtime filters at Open phase.
-    Status _acquire_runtime_filter();
+    Status _acquire_runtime_filter(bool pipeline_x);
     // Append late-arrival runtime filters to the vconjunct_ctx.
     Status _append_rf_into_conjuncts(const 
std::vector<vectorized::VRuntimeFilterPtr>& vexprs);
 
@@ -58,6 +61,7 @@ protected:
         // set to true if this runtime filter is already applied to 
vconjunct_ctx_ptr
         bool apply_mark = false;
         IRuntimeFilter* runtime_filter = nullptr;
+        pipeline::RuntimeFilterDependency* runtime_filter_dependency = nullptr;
     };
 
     std::vector<RuntimeFilterContext> _runtime_filter_ctxs;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 5f8e6d8aa4c..282d6f1182e 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -187,7 +187,7 @@ Status VScanNode::alloc_resource(RuntimeState* state) {
     }
     _output_tuple_desc = 
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     RETURN_IF_ERROR(ExecNode::alloc_resource(state));
-    RETURN_IF_ERROR(_acquire_runtime_filter());
+    RETURN_IF_ERROR(_acquire_runtime_filter(false));
     RETURN_IF_ERROR(_process_conjuncts());
 
     if (_is_pipeline_scan) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to