github-actions[bot] commented on code in PR #33369:
URL: https://github.com/apache/doris/pull/33369#discussion_r1555444133


##########
be/src/vec/exec/runtime_filter_consumer.h:
##########
@@ -38,13 +38,16 @@
 
     bool runtime_filters_are_ready_or_timeout();
 
-    void 
init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+    void init_runtime_filter_dependency(
+            std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                    runtime_filter_dependencies,
+            const int id, const int node_id, const std::string& name);

Review Comment:
   warning: parameter 'node_id' is const-qualified in the function declaration; 
const-qualification of parameters only has an effect in function definitions 
[readability-avoid-const-params-in-decls]
   
   ```suggestion
               const int id, int node_id, const std::string& name);
   ```
   



##########
be/src/pipeline/pipeline_x/pipeline_x_task.h:
##########
@@ -188,6 +189,17 @@ class PipelineXTask : public PipelineTask {
         return nullptr;
     }
 
+    Dependency* _runtime_filter_blocked_dependency() {

Review Comment:
   warning: method '_runtime_filter_blocked_dependency' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Dependency* _runtime_filter_blocked_dependency() {
   ```
   



##########
be/src/vec/exec/runtime_filter_consumer.h:
##########
@@ -38,13 +38,16 @@ class RuntimeFilterConsumer {
 
     bool runtime_filters_are_ready_or_timeout();
 
-    void 
init_runtime_filter_dependency(doris::pipeline::RuntimeFilterDependency*);
+    void init_runtime_filter_dependency(
+            std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
+                    runtime_filter_dependencies,
+            const int id, const int node_id, const std::string& name);

Review Comment:
   warning: parameter 'id' is const-qualified in the function declaration; 
const-qualification of parameters only has an effect in function definitions 
[readability-avoid-const-params-in-decls]
   
   ```suggestion
               int id, const int node_id, const std::string& name);
   ```
   



##########
be/src/pipeline/pipeline_x/dependency.cpp:
##########
@@ -114,82 +100,60 @@ std::string Dependency::debug_string(int 
indentation_level) {
 
 std::string RuntimeFilterDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block task = {}, ready={}, _filters = {}, 
_blocked_by_rf = {}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
-                   _ready, _filters.load(), _blocked_by_rf ? 
_blocked_by_rf->load() : false);
+    fmt::format_to(debug_string_buffer, "{}, runtime filter: {}",
+                   Dependency::debug_string(indentation_level), 
_runtime_filter->formatted_state());
     return fmt::to_string(debug_string_buffer);
 }
 
-bool RuntimeFilterTimer::has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    return _is_ready;
+Dependency* RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
+    std::unique_lock<std::mutex> lc(_task_lock);
+    auto ready = _ready.load() || _is_cancelled();
+    if (!ready && task) {
+        _add_block_task(task);
+        task->_blocked_dep = this;
+    }
+    return ready ? nullptr : this;
 }
 
 void RuntimeFilterTimer::call_timeout() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_ready) {
-        return;
-    }
-    _call_timeout = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
 void RuntimeFilterTimer::call_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    if (_call_timeout) {
-        return;
-    }
-    _call_ready = true;
-    if (_parent) {
-        _parent->sub_filters(_filter_id);
-    }
-    _is_ready = true;
-}
-
-void RuntimeFilterTimer::call_has_ready() {
-    std::unique_lock<std::mutex> lc(_lock);
-    DCHECK(!_call_timeout);
-    if (!_call_ready) {
-        _parent->sub_filters(_filter_id);
-    }
+    _parent->set_ready();
 }
 
-void RuntimeFilterDependency::add_filters(IRuntimeFilter* runtime_filter) {
-    const auto filter_id = runtime_filter->filter_id();
-    ;
-    _filters++;
-    _filter_ready_map[filter_id] = false;
-    int64_t registration_time = runtime_filter->registration_time();
-    int32 wait_time_ms = runtime_filter->wait_time_ms();
-    auto filter_timer = std::make_shared<RuntimeFilterTimer>(
-            filter_id, registration_time, wait_time_ms,
-            
std::dynamic_pointer_cast<RuntimeFilterDependency>(shared_from_this()));
-    runtime_filter->set_filter_timer(filter_timer);
-    
ExecEnv::GetInstance()->runtime_filter_timer_queue()->push_filter_timer(filter_timer);
-}
+void RuntimeFilterTimerQueue::start() {

Review Comment:
   warning: method 'start' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/pipeline/pipeline_x/dependency.h:236:
   ```diff
   -     void start();
   +     static void start();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to