github-actions[bot] commented on code in PR #32180:
URL: https://github.com/apache/doris/pull/32180#discussion_r1524641095


##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -39,7 +40,32 @@ class VRuntimeFilterSlots {
               _runtime_filters(runtime_filters),
               _need_local_merge(need_local_merge) {}
 
-    Status init(RuntimeState* state, int64_t hash_table_size) {
+    Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) {

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status send_filter_size(RuntimeState* state, int64_t 
hash_table_size, bool publish_local) {
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -39,7 +40,32 @@
               _runtime_filters(runtime_filters),
               _need_local_merge(need_local_merge) {}
 
-    Status init(RuntimeState* state, int64_t hash_table_size) {
+    Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) {
+        const auto max_in_num = state->runtime_filter_max_in_num();
+        const bool over_max_in_num = (hash_table_size >= max_in_num);
+        for (auto* runtime_filter : _runtime_filters) {
+            if (over_max_in_num &&
+                runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+                RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+            }
+
+            if (runtime_filter->is_bloomfilter() && !publish_local &&
+                runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+            }
+        }
+        return Status::OK();
+    }
+
+    void init_dependency(pipeline::CountedFinishDependency* dependency) {
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->is_bloomfilter() && 
runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
+            }
+        }
+    }
+
+    Status init(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) {

Review Comment:
   warning: function 'init' exceeds recommended size/complexity thresholds 
[readability-function-size]
   ```cpp
       Status init(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) {
              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/exprs/runtime_filter_slots.h:67:** 124 lines including whitespace 
and comments (threshold 80)
   ```cpp
       Status init(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) {
              ^
   ```
   
   </details>
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -39,7 +40,32 @@
               _runtime_filters(runtime_filters),
               _need_local_merge(need_local_merge) {}
 
-    Status init(RuntimeState* state, int64_t hash_table_size) {
+    Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) {
+        const auto max_in_num = state->runtime_filter_max_in_num();
+        const bool over_max_in_num = (hash_table_size >= max_in_num);
+        for (auto* runtime_filter : _runtime_filters) {
+            if (over_max_in_num &&
+                runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+                RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+            }
+
+            if (runtime_filter->is_bloomfilter() && !publish_local &&
+                runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+            }
+        }
+        return Status::OK();
+    }
+
+    void init_dependency(pipeline::CountedFinishDependency* dependency) {
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->is_bloomfilter() && 
runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
+            }
+        }
+    }
+
+    Status init(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) {

Review Comment:
   warning: method 'init' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       Status init(RuntimeState* state, int64_t hash_table_size, bool 
publish_local) const {
   ```
   



##########
be/src/runtime/runtime_filter_mgr.h:
##########
@@ -20,8 +20,10 @@
 #include <gen_cpp/PaloInternalService_types.h>

Review Comment:
   warning: 'gen_cpp/PaloInternalService_types.h' file not found 
[clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/PaloInternalService_types.h>
            ^
   ```
   



##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -303,6 +305,69 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id,
     return Status::OK();
 }
 
+Status RuntimeFilterMergeControllerEntity::send_filter_size(const 
PSendFilterSizeRequest* request) {

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status RuntimeFilterMergeControllerEntity::send_filter_size(const 
PSendFilterSizeRequest* request) {
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1357,71 +1357,117 @@
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
+    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    ObjectPool* pool = nullptr;
+
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (!fragment_instance_ids.empty()) {
-        UniqueId fragment_instance_id = fragment_instance_ids[0];
-        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
-        std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-
-        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-        ObjectPool* pool = nullptr;
-        if (is_pipeline) {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _pipeline_map.find(tfragment_instance_id);
-            if (iter == _pipeline_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
-                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
-            }
-            pip_context = iter->second;
+    {
+        std::unique_lock<std::mutex> lock(_lock);
+        for (UniqueId fragment_instance_id : fragment_instance_ids) {
+            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            pool = &pip_context->get_query_ctx()->obj_pool;
-        } else {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _fragment_instance_map.find(tfragment_instance_id);
-            if (iter == _fragment_instance_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment instance id:"
-                              << print_id(tfragment_instance_id);
-                return Status::InvalidArgument("fragment instance id: {}",
-                                               
print_id(tfragment_instance_id));
-            }
-            fragment_executor = iter->second;
+            if (is_pipeline) {
+                auto iter = _pipeline_map.find(tfragment_instance_id);
+                if (iter == _pipeline_map.end()) {
+                    continue;
+                }
+                pip_context = iter->second;
 
-            DCHECK(fragment_executor != nullptr);
-            runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
-            pool = &fragment_executor->get_query_ctx()->obj_pool;
-        }
+                DCHECK(pip_context != nullptr);
+                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+                pool = &pip_context->get_query_ctx()->obj_pool;
+            } else {
+                auto iter = _fragment_instance_map.find(tfragment_instance_id);
+                if (iter == _fragment_instance_map.end()) {
+                    continue;
+                }
+                fragment_executor = iter->second;
 
-        // 1. get the target filters
-        std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
-
-        // 2. create the filter wrapper to replace or ignore the target filters
-        if (request->has_in_filter() && 
request->in_filter().has_ignored_msg()) {
-            const auto& in_filter = request->in_filter();
-
-            std::ranges::for_each(filters, [&in_filter](auto& filter) {
-                filter->set_ignored(in_filter.ignored_msg());
-                filter->signal();
-            });
-        } else if (!filters.empty()) {
-            UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
-                                                filters[0]->column_type()};
-            RuntimePredicateWrapper* filter_wrapper = nullptr;
-            RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
-
-            std::ranges::for_each(filters, [&](auto& filter) {
-                filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
-            });
+                DCHECK(fragment_executor != nullptr);
+                runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+                pool = &fragment_executor->get_query_ctx()->obj_pool;
+            }
+            break;
         }
     }
 
+    if (runtime_filter_mgr == nullptr) {
+        // all instance finished
+        return Status::OK();
+    }
+
+    // 1. get the target filters
+    std::vector<IRuntimeFilter*> filters;
+    
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
+
+    // 2. create the filter wrapper to replace or ignore the target filters
+    if (request->has_in_filter() && request->in_filter().has_ignored_msg()) {
+        const auto& in_filter = request->in_filter();
+
+        std::ranges::for_each(filters, [&in_filter](auto& filter) {
+            filter->set_ignored(in_filter.ignored_msg());
+            filter->signal();
+        });
+    } else if (!filters.empty()) {
+        UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, 
filters[0]->column_type()};
+        RuntimePredicateWrapper* filter_wrapper = nullptr;
+        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
+
+        std::ranges::for_each(filters, [&](auto& filter) {
+            filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+        });
+    }
+
     return Status::OK();
 }
 
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
+    UniqueId queryid = request->query_id();
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
+
+    std::shared_ptr<QueryContext> query_ctx;
+    {
+        TUniqueId query_id;
+        query_id.__set_hi(queryid.hi);
+        query_id.__set_lo(queryid.lo);
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _query_ctx_map.find(query_id);
+        if (iter == _query_ctx_map.end()) {
+            return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
+        }
+
+        query_ctx = iter->second;
+    }
+    auto merge_status = filter_controller->send_filter_size(request);
+    return merge_status;
+}
+
+Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {

Review Comment:
   warning: method 'sync_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* 
request) {
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1357,71 +1357,117 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
+    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    ObjectPool* pool = nullptr;
+
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (!fragment_instance_ids.empty()) {
-        UniqueId fragment_instance_id = fragment_instance_ids[0];
-        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
-        std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-
-        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-        ObjectPool* pool = nullptr;
-        if (is_pipeline) {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _pipeline_map.find(tfragment_instance_id);
-            if (iter == _pipeline_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
-                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
-            }
-            pip_context = iter->second;
+    {
+        std::unique_lock<std::mutex> lock(_lock);
+        for (UniqueId fragment_instance_id : fragment_instance_ids) {
+            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            pool = &pip_context->get_query_ctx()->obj_pool;
-        } else {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _fragment_instance_map.find(tfragment_instance_id);
-            if (iter == _fragment_instance_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment instance id:"
-                              << print_id(tfragment_instance_id);
-                return Status::InvalidArgument("fragment instance id: {}",
-                                               
print_id(tfragment_instance_id));
-            }
-            fragment_executor = iter->second;
+            if (is_pipeline) {
+                auto iter = _pipeline_map.find(tfragment_instance_id);
+                if (iter == _pipeline_map.end()) {
+                    continue;
+                }
+                pip_context = iter->second;
 
-            DCHECK(fragment_executor != nullptr);
-            runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
-            pool = &fragment_executor->get_query_ctx()->obj_pool;
-        }
+                DCHECK(pip_context != nullptr);
+                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+                pool = &pip_context->get_query_ctx()->obj_pool;
+            } else {
+                auto iter = _fragment_instance_map.find(tfragment_instance_id);
+                if (iter == _fragment_instance_map.end()) {
+                    continue;
+                }
+                fragment_executor = iter->second;
 
-        // 1. get the target filters
-        std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
-
-        // 2. create the filter wrapper to replace or ignore the target filters
-        if (request->has_in_filter() && 
request->in_filter().has_ignored_msg()) {
-            const auto& in_filter = request->in_filter();
-
-            std::ranges::for_each(filters, [&in_filter](auto& filter) {
-                filter->set_ignored(in_filter.ignored_msg());
-                filter->signal();
-            });
-        } else if (!filters.empty()) {
-            UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
-                                                filters[0]->column_type()};
-            RuntimePredicateWrapper* filter_wrapper = nullptr;
-            RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
-
-            std::ranges::for_each(filters, [&](auto& filter) {
-                filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
-            });
+                DCHECK(fragment_executor != nullptr);
+                runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+                pool = &fragment_executor->get_query_ctx()->obj_pool;
+            }
+            break;
         }
     }
 
+    if (runtime_filter_mgr == nullptr) {
+        // all instance finished
+        return Status::OK();
+    }
+
+    // 1. get the target filters
+    std::vector<IRuntimeFilter*> filters;
+    
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
+
+    // 2. create the filter wrapper to replace or ignore the target filters
+    if (request->has_in_filter() && request->in_filter().has_ignored_msg()) {
+        const auto& in_filter = request->in_filter();
+
+        std::ranges::for_each(filters, [&in_filter](auto& filter) {
+            filter->set_ignored(in_filter.ignored_msg());
+            filter->signal();
+        });
+    } else if (!filters.empty()) {
+        UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, 
filters[0]->column_type()};
+        RuntimePredicateWrapper* filter_wrapper = nullptr;
+        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
+
+        std::ranges::for_each(filters, [&](auto& filter) {
+            filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+        });
+    }
+
     return Status::OK();
 }
 
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* 
request) {
   ```
   



##########
be/src/runtime/runtime_filter_mgr.h:
##########
@@ -79,6 +84,15 @@
 
     Status get_consume_filters(const int filter_id, 
std::vector<IRuntimeFilter*>& consumer_filters);
 
+    IRuntimeFilter* try_get_product_filter(const int filter_id) {

Review Comment:
   warning: method 'try_get_product_filter' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static IRuntimeFilter* try_get_product_filter(const int filter_id) {
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -1167,9 +1167,36 @@
         auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
         Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-        if (!st.ok()) {
-            LOG(WARNING) << "merge meet error" << st.to_string();
-        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::send_filter_size(::google::protobuf::RpcController* 
controller,
+                                        const ::doris::PSendFilterSizeRequest* 
request,
+                                        ::doris::PSendFilterSizeResponse* 
response,
+                                        ::google::protobuf::Closure* done) {
+    bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+        brpc::ClosureGuard closure_guard(done);
+        Status st = _exec_env->fragment_mgr()->send_filter_size(request);
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::sync_filter_size(::google::protobuf::RpcController* 
controller,

Review Comment:
   warning: method 'sync_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void 
PInternalService::sync_filter_size(::google::protobuf::RpcController* 
controller,
   ```
   



##########
be/src/service/internal_service.cpp:
##########
@@ -1167,9 +1167,36 @@ void 
PInternalService::merge_filter(::google::protobuf::RpcController* controlle
         auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
         butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
         Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-        if (!st.ok()) {
-            LOG(WARNING) << "merge meet error" << st.to_string();
-        }
+        st.to_protobuf(response->mutable_status());
+    });
+    if (!ret) {
+        offer_failed(response, done, _light_work_pool);
+        return;
+    }
+}
+
+void PInternalService::send_filter_size(::google::protobuf::RpcController* 
controller,

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void 
PInternalService::send_filter_size(::google::protobuf::RpcController* 
controller,
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to