github-actions[bot] commented on code in PR #26078:
URL: https://github.com/apache/doris/pull/26078#discussion_r1375738080


##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -326,4 +330,107 @@
     return Status::OK();
 }
 
+bool RuntimeFilterTimer::has_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_ready) {
+        return;
+    }
+    _call_timeout = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_timeout) {
+        return;
+    }
+    _call_ready = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_has_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    DCHECK(!_call_timeout);
+    if (!_call_ready) {
+        _parent->sub_filters();
+    }
+}
+
+struct RuntimeFilterTimerQueue {
+    constexpr static int64_t interval = 50;
+    void start() {
+        while (true) {
+            std::unique_lock<std::mutex> lk(cv_m);
+
+            cv.wait(lk, [this] { return !_que.empty(); });
+            {
+                std::unique_lock<std::mutex> lc(_que_lock);
+                std::list<std::shared_ptr<RuntimeFilterTimer>> new_que;
+                for (auto& it : _que) {
+                    if (it->has_ready()) {
+                        it->call_has_ready();
+                    } else {
+                        int64_t ms_since_registration = MonotonicMillis() - 
it->registration_time();
+                        if (ms_since_registration > it->wait_time_ms()) {
+                            it->call_timeout();
+                        } else {
+                            new_que.push_back(std::move(it));
+                        }
+                    }
+                }
+                new_que.swap(_que);
+            }
+            std::this_thread::sleep_for(std::chrono::milliseconds(interval));
+        }
+    }
+    ~RuntimeFilterTimerQueue() { _thread.detach(); }
+    RuntimeFilterTimerQueue() { _thread = 
std::thread(&RuntimeFilterTimerQueue::start, this); }
+    static void push_filter_timer(std::shared_ptr<RuntimeFilterTimer> filter) {
+        static RuntimeFilterTimerQueue timer_que;
+
+        timer_que.push(filter);
+    }
+
+    void push(std::shared_ptr<RuntimeFilterTimer> filter) {
+        std::unique_lock<std::mutex> lc(_que_lock);
+        _que.push_back(filter);
+        cv.notify_all();
+    }
+
+    std::thread _thread;
+    std::condition_variable cv;
+    std::mutex cv_m;
+    std::mutex _que_lock;
+
+    std::list<std::shared_ptr<RuntimeFilterTimer>> _que;
+};
+
+void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {

Review Comment:
   warning: method 'add_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:247:
   ```diff
   -     void add_filters(IRuntimeFilter* runtime_filter);
   +     static void add_filters(IRuntimeFilter* runtime_filter);
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -326,4 +330,107 @@ Status 
HashJoinDependency::extract_join_column(vectorized::Block& block,
     return Status::OK();
 }
 
+bool RuntimeFilterTimer::has_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    return _runtime_filter->is_ready();
+}
+
+void RuntimeFilterTimer::call_timeout() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_ready) {
+        return;
+    }
+    _call_timeout = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_ready() {
+    std::unique_lock<std::mutex> lc(_lock);
+    if (_call_timeout) {
+        return;
+    }
+    _call_ready = true;
+    if (_parent) {
+        _parent->sub_filters();
+    }
+}
+
+void RuntimeFilterTimer::call_has_ready() {

Review Comment:
   warning: method 'call_has_ready' can be made const 
[readability-make-member-function-const]
   
   be/src/pipeline/pipeline_x/dependency.h:216:
   ```diff
   -     void call_has_ready();
   +     void call_has_ready() const;
   ```
   
   ```suggestion
   void RuntimeFilterTimer::call_has_ready() const {
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to