github-actions[bot] commented on code in PR #33504:
URL: https://github.com/apache/doris/pull/33504#discussion_r1559717800


##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -34,130 +34,77 @@
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
-            : _build_expr_context(build_expr_ctxs),
-              _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
-            }
-            for (auto* filter : filters) {
-                filter->set_ignored("");
-                filter->signal();
-            }
-            return Status::OK();
-        };
+            const std::vector<IRuntimeFilter*>& runtime_filters)
+            : _build_expr_context(build_expr_ctxs), 
_runtime_filters(runtime_filters) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
             return Status::OK();
-        };
-
-        // ordered vector: IN, IN_OR_BLOOM, others.
-        // so we can ignore other filter if IN Predicate exists.
-        auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
-            if (d1->type() == d2->type()) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return false;
-            } else {
-                return d1->type() < d2->type();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
             }
-        };
-        std::sort(_runtime_filters.begin(), _runtime_filters.end(), 
compare_desc);
-
-        // do not create 'in filter' when hash_table size over limit
-        const auto max_in_num = state->runtime_filter_max_in_num();
-        const bool over_max_in_num = (hash_table_size >= max_in_num);
+        }
 
+        // send_filter_size may call dependency->sub(), so we call 
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
         for (auto* runtime_filter : _runtime_filters) {
-            if (runtime_filter->expr_order() < 0 ||
-                runtime_filter->expr_order() >= _build_expr_context.size()) {
-                return Status::InternalError(
-                        "runtime_filter meet invalid expr_order, 
expr_order={}, "
-                        "_build_expr_context.size={}",
-                        runtime_filter->expr_order(), 
_build_expr_context.size());
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
             }
+        }
+        return Status::OK();
+    }
 
-            bool is_in_filter = (runtime_filter->type() == 
RuntimeFilterType::IN_FILTER);
+    // use synced size when this rf has global merged
+    static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t 
hash_table_size) {
+        return runtime_filter->isset_synced_size() ? 
runtime_filter->get_synced_size()
+                                                   : hash_table_size;
+    }
 
-            if (over_max_in_num &&
-                runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+    Status ignore_filters(RuntimeState* state) {

Review Comment:
   warning: method 'ignore_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status ignore_filters(RuntimeState* state) {
   ```
   



##########
be/src/pipeline/exec/hashjoin_build_sink.cpp:
##########
@@ -112,6 +121,34 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* 
state) {
     return Status::OK();
 }
 
+Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) {

Review Comment:
   warning: method 'close' can be made const 
[readability-make-member-function-const]
   
   be/src/pipeline/exec/hashjoin_build_sink.h:74:
   ```diff
   -     Status close(RuntimeState* state, Status exec_status) override;
   +     Status close(RuntimeState* state, Status exec_status) const override;
   ```
   
   ```suggestion
   Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status 
exec_status) const {
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1348,78 +1348,112 @@
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
+    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+    QueryThreadContext query_thread_context;
+
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    ObjectPool* pool = nullptr;
+
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (!fragment_instance_ids.empty()) {
-        UniqueId fragment_instance_id = fragment_instance_ids[0];
-        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
-        std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-        QueryThreadContext query_thread_context;
-
-        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-        ObjectPool* pool = nullptr;
-        if (is_pipeline) {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _pipeline_map.find(tfragment_instance_id);
-            if (iter == _pipeline_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
-                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
-            }
-            pip_context = iter->second;
+    {
+        std::unique_lock<std::mutex> lock(_lock);
+        for (UniqueId fragment_instance_id : fragment_instance_ids) {
+            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            pool = &pip_context->get_query_ctx()->obj_pool;
-            query_thread_context = {pip_context->get_query_ctx()->query_id(),
-                                    
pip_context->get_query_ctx()->query_mem_tracker};
-        } else {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _fragment_instance_map.find(tfragment_instance_id);
-            if (iter == _fragment_instance_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment instance id:"
-                              << print_id(tfragment_instance_id);
-                return Status::InvalidArgument("fragment instance id: {}",
-                                               
print_id(tfragment_instance_id));
-            }
-            fragment_executor = iter->second;
+            if (is_pipeline) {
+                auto iter = _pipeline_map.find(tfragment_instance_id);
+                if (iter == _pipeline_map.end()) {
+                    continue;
+                }
+                pip_context = iter->second;
 
-            DCHECK(fragment_executor != nullptr);
-            runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
-            pool = &fragment_executor->get_query_ctx()->obj_pool;
-            query_thread_context = 
{fragment_executor->get_query_ctx()->query_id(),
-                                    
fragment_executor->get_query_ctx()->query_mem_tracker};
-        }
+                DCHECK(pip_context != nullptr);
+                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+                pool = &pip_context->get_query_ctx()->obj_pool;
+                query_thread_context = 
{pip_context->get_query_ctx()->query_id(),
+                                        
pip_context->get_query_ctx()->query_mem_tracker};
+            } else {
+                auto iter = _fragment_instance_map.find(tfragment_instance_id);
+                if (iter == _fragment_instance_map.end()) {
+                    continue;
+                }
+                fragment_executor = iter->second;
 
-        SCOPED_ATTACH_TASK(query_thread_context);
-
-        // 1. get the target filters
-        std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
-
-        // 2. create the filter wrapper to replace or ignore the target filters
-        if (request->has_in_filter() && 
request->in_filter().has_ignored_msg()) {
-            const auto& in_filter = request->in_filter();
-
-            std::ranges::for_each(filters, [&in_filter](auto& filter) {
-                filter->set_ignored(in_filter.ignored_msg());
-                filter->signal();
-            });
-        } else if (!filters.empty()) {
-            UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
-                                                filters[0]->column_type()};
-            RuntimePredicateWrapper* filter_wrapper = nullptr;
-            RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
-
-            std::ranges::for_each(filters, [&](auto& filter) {
-                filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
-            });
+                DCHECK(fragment_executor != nullptr);
+                runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+                pool = &fragment_executor->get_query_ctx()->obj_pool;
+                query_thread_context = 
{fragment_executor->get_query_ctx()->query_id(),
+                                        
fragment_executor->get_query_ctx()->query_mem_tracker};
+            }
+            break;
         }
     }
 
+    if (runtime_filter_mgr == nullptr) {
+        // all instance finished
+        return Status::OK();
+    }
+
+    SCOPED_ATTACH_TASK(query_thread_context);
+    // 1. get the target filters
+    std::vector<IRuntimeFilter*> filters;
+    
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
+
+    // 2. create the filter wrapper to replace or ignore the target filters
+    if (!filters.empty()) {
+        UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, 
filters[0]->column_type()};
+        RuntimePredicateWrapper* filter_wrapper = nullptr;
+        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
+
+        std::ranges::for_each(filters, [&](auto& filter) {
+            filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+        });
+    }
+
     return Status::OK();
 }
 
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
+    UniqueId queryid = request->query_id();
+    std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+    RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, 
&filter_controller));
+
+    std::shared_ptr<QueryContext> query_ctx;
+    {
+        TUniqueId query_id;
+        query_id.__set_hi(queryid.hi);
+        query_id.__set_lo(queryid.lo);
+        std::lock_guard<std::mutex> lock(_lock);
+        auto iter = _query_ctx_map.find(query_id);
+        if (iter == _query_ctx_map.end()) {
+            return Status::InvalidArgument("query-id: {}", 
queryid.to_string());
+        }
+
+        query_ctx = iter->second;
+    }
+    auto merge_status = filter_controller->send_filter_size(request);
+    return merge_status;
+}
+
+Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {

Review Comment:
   warning: method 'sync_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* 
request) {
   ```
   



##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -311,6 +314,67 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId 
query_id,
     return Status::OK();
 }
 
+Status RuntimeFilterMergeControllerEntity::send_filter_size(const 
PSendFilterSizeRequest* request) {

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status RuntimeFilterMergeControllerEntity::send_filter_size(const 
PSendFilterSizeRequest* request) {
   ```
   



##########
be/src/vec/exprs/vexpr.cpp:
##########
@@ -407,6 +409,39 @@ Status VExpr::create_expr_trees(const std::vector<TExpr>& 
texprs, VExprContextSP
     return Status::OK();
 }
 
+Status VExpr::check_expr_output_type(const VExprContextSPtrs& ctxs,

Review Comment:
   warning: method 'check_expr_output_type' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status VExpr::check_expr_output_type(const VExprContextSPtrs& ctxs,
   ```
   



##########
be/test/vec/data_types/serde/data_type_serde_pb_test.cpp:
##########
@@ -19,6 +19,7 @@
 #include <gen_cpp/types.pb.h>

Review Comment:
   warning: 'gen_cpp/types.pb.h' file not found [clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/types.pb.h>
            ^
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1348,78 +1348,112 @@ Status FragmentMgr::apply_filterv2(const 
PPublishFilterRequestV2* request,
     bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
     int64_t start_apply = MonotonicMillis();
 
+    std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+    std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+    QueryThreadContext query_thread_context;
+
+    RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+    ObjectPool* pool = nullptr;
+
     const auto& fragment_instance_ids = request->fragment_instance_ids();
-    if (!fragment_instance_ids.empty()) {
-        UniqueId fragment_instance_id = fragment_instance_ids[0];
-        TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
-        std::shared_ptr<PlanFragmentExecutor> fragment_executor;
-        std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-        QueryThreadContext query_thread_context;
-
-        RuntimeFilterMgr* runtime_filter_mgr = nullptr;
-        ObjectPool* pool = nullptr;
-        if (is_pipeline) {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _pipeline_map.find(tfragment_instance_id);
-            if (iter == _pipeline_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment-id:" << 
fragment_instance_id;
-                return Status::InvalidArgument("fragment-id: {}", 
fragment_instance_id.to_string());
-            }
-            pip_context = iter->second;
+    {
+        std::unique_lock<std::mutex> lock(_lock);
+        for (UniqueId fragment_instance_id : fragment_instance_ids) {
+            TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
 
-            DCHECK(pip_context != nullptr);
-            runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
-            pool = &pip_context->get_query_ctx()->obj_pool;
-            query_thread_context = {pip_context->get_query_ctx()->query_id(),
-                                    
pip_context->get_query_ctx()->query_mem_tracker};
-        } else {
-            std::unique_lock<std::mutex> lock(_lock);
-            auto iter = _fragment_instance_map.find(tfragment_instance_id);
-            if (iter == _fragment_instance_map.end()) {
-                VLOG_CRITICAL << "unknown.... fragment instance id:"
-                              << print_id(tfragment_instance_id);
-                return Status::InvalidArgument("fragment instance id: {}",
-                                               
print_id(tfragment_instance_id));
-            }
-            fragment_executor = iter->second;
+            if (is_pipeline) {
+                auto iter = _pipeline_map.find(tfragment_instance_id);
+                if (iter == _pipeline_map.end()) {
+                    continue;
+                }
+                pip_context = iter->second;
 
-            DCHECK(fragment_executor != nullptr);
-            runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
-            pool = &fragment_executor->get_query_ctx()->obj_pool;
-            query_thread_context = 
{fragment_executor->get_query_ctx()->query_id(),
-                                    
fragment_executor->get_query_ctx()->query_mem_tracker};
-        }
+                DCHECK(pip_context != nullptr);
+                runtime_filter_mgr = 
pip_context->get_query_ctx()->runtime_filter_mgr();
+                pool = &pip_context->get_query_ctx()->obj_pool;
+                query_thread_context = 
{pip_context->get_query_ctx()->query_id(),
+                                        
pip_context->get_query_ctx()->query_mem_tracker};
+            } else {
+                auto iter = _fragment_instance_map.find(tfragment_instance_id);
+                if (iter == _fragment_instance_map.end()) {
+                    continue;
+                }
+                fragment_executor = iter->second;
 
-        SCOPED_ATTACH_TASK(query_thread_context);
-
-        // 1. get the target filters
-        std::vector<IRuntimeFilter*> filters;
-        
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
-
-        // 2. create the filter wrapper to replace or ignore the target filters
-        if (request->has_in_filter() && 
request->in_filter().has_ignored_msg()) {
-            const auto& in_filter = request->in_filter();
-
-            std::ranges::for_each(filters, [&in_filter](auto& filter) {
-                filter->set_ignored(in_filter.ignored_msg());
-                filter->signal();
-            });
-        } else if (!filters.empty()) {
-            UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
-                                                filters[0]->column_type()};
-            RuntimePredicateWrapper* filter_wrapper = nullptr;
-            RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
-
-            std::ranges::for_each(filters, [&](auto& filter) {
-                filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
-            });
+                DCHECK(fragment_executor != nullptr);
+                runtime_filter_mgr = 
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+                pool = &fragment_executor->get_query_ctx()->obj_pool;
+                query_thread_context = 
{fragment_executor->get_query_ctx()->query_id(),
+                                        
fragment_executor->get_query_ctx()->query_mem_tracker};
+            }
+            break;
         }
     }
 
+    if (runtime_filter_mgr == nullptr) {
+        // all instance finished
+        return Status::OK();
+    }
+
+    SCOPED_ATTACH_TASK(query_thread_context);
+    // 1. get the target filters
+    std::vector<IRuntimeFilter*> filters;
+    
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(), 
filters));
+
+    // 2. create the filter wrapper to replace or ignore the target filters
+    if (!filters.empty()) {
+        UpdateRuntimeFilterParamsV2 params {request, attach_data, pool, 
filters[0]->column_type()};
+        RuntimePredicateWrapper* filter_wrapper = nullptr;
+        RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(&params, 
&filter_wrapper));
+
+        std::ranges::for_each(filters, [&](auto& filter) {
+            filter->update_filter(filter_wrapper, request->merge_time(), 
start_apply);
+        });
+    }
+
     return Status::OK();
 }
 
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* 
request) {
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -34,130 +34,77 @@
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
-            : _build_expr_context(build_expr_ctxs),
-              _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
-            }
-            for (auto* filter : filters) {
-                filter->set_ignored("");
-                filter->signal();
-            }
-            return Status::OK();
-        };
+            const std::vector<IRuntimeFilter*>& runtime_filters)
+            : _build_expr_context(build_expr_ctxs), 
_runtime_filters(runtime_filters) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
             return Status::OK();
-        };
-
-        // ordered vector: IN, IN_OR_BLOOM, others.
-        // so we can ignore other filter if IN Predicate exists.
-        auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
-            if (d1->type() == d2->type()) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return false;
-            } else {
-                return d1->type() < d2->type();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
             }
-        };
-        std::sort(_runtime_filters.begin(), _runtime_filters.end(), 
compare_desc);
-
-        // do not create 'in filter' when hash_table size over limit
-        const auto max_in_num = state->runtime_filter_max_in_num();
-        const bool over_max_in_num = (hash_table_size >= max_in_num);
+        }
 
+        // send_filter_size may call dependency->sub(), so we call 
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
         for (auto* runtime_filter : _runtime_filters) {
-            if (runtime_filter->expr_order() < 0 ||
-                runtime_filter->expr_order() >= _build_expr_context.size()) {
-                return Status::InternalError(
-                        "runtime_filter meet invalid expr_order, 
expr_order={}, "
-                        "_build_expr_context.size={}",
-                        runtime_filter->expr_order(), 
_build_expr_context.size());
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
             }
+        }
+        return Status::OK();
+    }
 
-            bool is_in_filter = (runtime_filter->type() == 
RuntimeFilterType::IN_FILTER);
+    // use synced size when this rf has global merged
+    static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t 
hash_table_size) {
+        return runtime_filter->isset_synced_size() ? 
runtime_filter->get_synced_size()
+                                                   : hash_table_size;
+    }
 
-            if (over_max_in_num &&
-                runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+    Status ignore_filters(RuntimeState* state) {
+        // process ignore duplicate IN_FILTER
+        std::unordered_set<int> has_in_filter;
+        for (auto* filter : _runtime_filters) {
+            if (filter->get_real_type() != RuntimeFilterType::IN_FILTER) {
+                continue;
             }
-
-            if (runtime_filter->is_bloomfilter()) {
-                
RETURN_IF_ERROR(runtime_filter->init_bloom_filter(hash_table_size));
+            if (has_in_filter.contains(filter->expr_order())) {
+                filter->set_ignored();
+                continue;
             }
+            has_in_filter.insert(filter->expr_order());
+        }
 
-            // Note:
-            // In the case that exist *remote target* and in filter and other 
filter,
-            // we must merge other filter whatever in filter is over the max 
num in current node,
-            // because:
-            // case 1: (in filter >= max num) in current node, so in filter 
will be ignored,
-            //         and then other filter can be used
-            // case 2: (in filter < max num) in current node, we don't know 
whether the in filter
-            //         will be ignored in merge node, so we must transfer 
other filter to merge node
-            if (!runtime_filter->has_remote_target()) {
-                bool exists_in_filter = 
has_in_filter[runtime_filter->expr_order()];
-                if (is_in_filter && over_max_in_num) {
-                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
-                               << " ignore runtime filter(in filter id "
-                               << runtime_filter->filter_id() << ") because: 
in_num("
-                               << hash_table_size << ") >= max_in_num(" << 
max_in_num << ")";
-                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
-                    continue;
-                } else if (!is_in_filter && exists_in_filter) {
-                    // do not create 'bloom filter' and 'minmax filter' when 
'in filter' has created
-                    // because in filter is exactly filter, so it is enough to 
filter data
-                    VLOG_DEBUG << "fragment instance " << 
print_id(state->fragment_instance_id())
-                               << " ignore runtime filter("
-                               << 
IRuntimeFilter::to_string(runtime_filter->type()) << " id "
-                               << runtime_filter->filter_id()
-                               << ") because: already exists in filter";
-                    
RETURN_IF_ERROR(ignore_local_filter(runtime_filter->filter_id()));
-                    continue;
-                }
-            } else if (is_in_filter && over_max_in_num) {
-                std::string msg = fmt::format(
-                        "fragment instance {} ignore runtime filter(in filter 
id {}) because: "
-                        "in_num({}) >= max_in_num({})",
-                        print_id(state->fragment_instance_id()), 
runtime_filter->filter_id(),
-                        hash_table_size, max_in_num);
-                RETURN_IF_ERROR(ignore_remote_filter(runtime_filter, msg));
+        // process ignore filter when it has IN_FILTER on same expr, and init 
bloom filter size
+        for (auto* filter : _runtime_filters) {
+            if (filter->get_real_type() == RuntimeFilterType::IN_FILTER ||
+                !has_in_filter.contains(filter->expr_order())) {
                 continue;
             }
+            filter->set_ignored();
+        }
+        return Status::OK();
+    }
 
-            if ((runtime_filter->type() == RuntimeFilterType::IN_FILTER) ||
-                (runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
-                 !over_max_in_num)) {
-                has_in_filter[runtime_filter->expr_order()] = true;
+    Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {

Review Comment:
   warning: method 'init_filters' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status init_filters(RuntimeState* state, uint64_t 
local_hash_table_size) {
   ```
   



##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -34,130 +34,77 @@ class VRuntimeFilterSlots {
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
-            : _build_expr_context(build_expr_ctxs),
-              _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
-            }
-            for (auto* filter : filters) {
-                filter->set_ignored("");
-                filter->signal();
-            }
-            return Status::OK();
-        };
+            const std::vector<IRuntimeFilter*>& runtime_filters)
+            : _build_expr_context(build_expr_ctxs), 
_runtime_filters(runtime_filters) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,

Review Comment:
   warning: method 'send_filter_size' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static Status send_filter_size(RuntimeState* state, uint64_t 
hash_table_size, bool publish_local,
   ```
   



##########
be/src/exprs/runtime_filter.cpp:
##########
@@ -434,30 +439,33 @@ class RuntimePredicateWrapper {
                 bitmaps.push_back(&(col->get_data()[i]));
             }
         }
-        _context.bitmap_filter_func->insert_many(bitmaps);
+        _context->bitmap_filter_func->insert_many(bitmaps);
     }
 
     RuntimeFilterType get_real_type() const {
-        auto real_filter_type = _filter_type;
-        if (real_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-            real_filter_type = _is_bloomfilter ? 
RuntimeFilterType::BLOOM_FILTER
-                                               : RuntimeFilterType::IN_FILTER;
+        if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+            if (_context->hybrid_set) {
+                return RuntimeFilterType::IN_FILTER;
+            }
+            return RuntimeFilterType::BLOOM_FILTER;
         }
-        return real_filter_type;
+        return _filter_type;
     }
 
     size_t get_bloom_filter_size() const {
-        if (_is_bloomfilter) {
-            return _context.bloom_filter_func->get_size();
-        }
-        return 0;
+        return _context->bloom_filter_func ? 
_context->bloom_filter_func->get_size() : 0;
     }
 
     Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
                           std::vector<vectorized::VRuntimeFilterPtr>& 
push_exprs,
                           const TExpr& probe_expr);
 
     Status merge(const RuntimePredicateWrapper* wrapper) {

Review Comment:
   warning: function 'merge' has cognitive complexity of 73 (threshold 50) 
[readability-function-cognitive-complexity]
   ```cpp
       Status merge(const RuntimePredicateWrapper* wrapper) {
              ^
   ```
   <details>
   <summary>Additional context</summary>
   
   **be/src/exprs/runtime_filter.cpp:463:** +1, including nesting penalty of 0, 
nesting level increased to 1
   ```cpp
           if (is_ignored() || wrapper->is_ignored()) {
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:463:** +1
   ```cpp
           if (is_ignored() || wrapper->is_ignored()) {
                            ^
   ```
   **be/src/exprs/runtime_filter.cpp:469:** +1
   ```cpp
                   _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                                                                         ^
   ```
   **be/src/exprs/runtime_filter.cpp:474:** +1
   ```cpp
           bool can_not_merge_other = _filter_type != 
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                                                                                
            ^
   ```
   **be/src/exprs/runtime_filter.cpp:477:** +1
   ```cpp
           CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
                                            ^
   ```
   **be/src/exprs/runtime_filter.cpp:482:** +1, including nesting penalty of 0, 
nesting level increased to 1
   ```cpp
           switch (_filter_type) {
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:486:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
               if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
               ^
   ```
   **be/src/exprs/runtime_filter.cpp:496:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
               
RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get()));
               ^
   ```
   **be/src/common/status.h:538:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:496:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
               
RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get()));
               ^
   ```
   **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:500:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
               RETURN_IF_ERROR(
               ^
   ```
   **be/src/common/status.h:538:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:500:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
               RETURN_IF_ERROR(
               ^
   ```
   **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:508:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
               if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
               ^
   ```
   **be/src/exprs/runtime_filter.cpp:512:** +2, including nesting penalty of 1, 
nesting level increased to 2
   ```cpp
               if (real_filter_type == RuntimeFilterType::IN_FILTER) {
               ^
   ```
   **be/src/exprs/runtime_filter.cpp:513:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
                   if (other_filter_type == RuntimeFilterType::IN_FILTER) { // 
in merge in
                   ^
   ```
   **be/src/exprs/runtime_filter.cpp:515:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
                       if (_max_in_num >= 0 && _context->hybrid_set->size() >= 
_max_in_num) {
                       ^
   ```
   **be/src/exprs/runtime_filter.cpp:519:** +5, including nesting penalty of 4, 
nesting level increased to 5
   ```cpp
                           RETURN_IF_ERROR(change_to_bloom_filter(true));
                           ^
   ```
   **be/src/common/status.h:538:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:519:** +6, including nesting penalty of 5, 
nesting level increased to 6
   ```cpp
                           RETURN_IF_ERROR(change_to_bloom_filter(true));
                           ^
   ```
   **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:521:** +1, nesting level increased to 3
   ```cpp
                   } else {
                     ^
   ```
   **be/src/exprs/runtime_filter.cpp:524:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(change_to_bloom_filter(false));
                       ^
   ```
   **be/src/common/status.h:538:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:524:** +5, including nesting penalty of 4, 
nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(change_to_bloom_filter(false));
                       ^
   ```
   **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:525:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(_context->bloom_filter_func->merge(
                       ^
   ```
   **be/src/common/status.h:538:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:525:** +5, including nesting penalty of 4, 
nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(_context->bloom_filter_func->merge(
                       ^
   ```
   **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   **be/src/exprs/runtime_filter.cpp:528:** +1, nesting level increased to 2
   ```cpp
               } else {
                 ^
   ```
   **be/src/exprs/runtime_filter.cpp:529:** +3, including nesting penalty of 2, 
nesting level increased to 3
   ```cpp
                   if (other_filter_type == RuntimeFilterType::IN_FILTER) { // 
bloom filter merge in
                   ^
   ```
   **be/src/exprs/runtime_filter.cpp:532:** +1, nesting level increased to 3
   ```cpp
                   } else {
                     ^
   ```
   **be/src/exprs/runtime_filter.cpp:533:** +4, including nesting penalty of 3, 
nesting level increased to 4
   ```cpp
                       RETURN_IF_ERROR(_context->bloom_filter_func->merge(
                       ^
   ```
   **be/src/common/status.h:538:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
       do {                                \
       ^
   ```
   **be/src/exprs/runtime_filter.cpp:533:** +5, including nesting penalty of 4, 
nesting level increased to 5
   ```cpp
                       RETURN_IF_ERROR(_context->bloom_filter_func->merge(
                       ^
   ```
   **be/src/common/status.h:540:** expanded from macro 'RETURN_IF_ERROR'
   ```cpp
           if (UNLIKELY(!_status_.ok())) { \
           ^
   ```
   
   </details>
   



##########
be/src/runtime/runtime_filter_mgr.h:
##########
@@ -20,8 +20,10 @@
 #include <gen_cpp/PaloInternalService_types.h>

Review Comment:
   warning: 'gen_cpp/PaloInternalService_types.h' file not found 
[clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/PaloInternalService_types.h>
            ^
   ```
   



##########
be/src/runtime/runtime_filter_mgr.h:
##########
@@ -81,6 +86,15 @@
 
     Status get_consume_filters(const int filter_id, 
std::vector<IRuntimeFilter*>& consumer_filters);
 
+    IRuntimeFilter* try_get_product_filter(const int filter_id) {

Review Comment:
   warning: method 'try_get_product_filter' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
       static IRuntimeFilter* try_get_product_filter(const int filter_id) {
   ```
   



##########
be/src/vec/data_types/serde/data_type_jsonb_serde.cpp:
##########
@@ -239,6 +239,34 @@
                     parser.getWriter().getOutput()->getSize());
     return Status::OK();
 }
+Status DataTypeJsonbSerDe::write_column_to_pb(const IColumn& column, PValues& 
result, int start,
+                                              int end) const {
+    const auto& string_column = assert_cast<const ColumnString&>(column);
+    result.mutable_string_value()->Reserve(end - start);
+    auto* ptype = result.mutable_type();
+    ptype->set_id(PGenericType::JSONB);
+    for (size_t row_num = start; row_num < end; ++row_num) {
+        const auto& string_ref = string_column.get_data_at(row_num);
+        if (string_ref.size > 0) {
+            result.add_string_value(
+                    JsonbToJson::jsonb_to_json_string(string_ref.data, 
string_ref.size));
+        } else {
+            result.add_string_value(NULL_IN_CSV_FOR_ORDINARY_TYPE);
+        }
+    }
+    return Status::OK();
+}
 
+Status DataTypeJsonbSerDe::read_column_from_pb(IColumn& column, const PValues& 
arg) const {

Review Comment:
   warning: method 'read_column_from_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   Status DataTypeJsonbSerDe::read_column_from_pb(IColumn& column, const 
PValues& arg) {
   ```
   
   be/src/vec/data_types/serde/data_type_jsonb_serde.h:69:
   ```diff
   -     Status read_column_from_pb(IColumn& column, const PValues& arg) const 
override;
   +     static Status read_column_from_pb(IColumn& column, const PValues& arg) 
override;
   ```
   



##########
be/src/vec/data_types/serde/data_type_jsonb_serde.cpp:
##########
@@ -239,6 +239,34 @@ Status 
DataTypeJsonbSerDe::read_one_cell_from_json(IColumn& column,
                     parser.getWriter().getOutput()->getSize());
     return Status::OK();
 }
+Status DataTypeJsonbSerDe::write_column_to_pb(const IColumn& column, PValues& 
result, int start,

Review Comment:
   warning: method 'write_column_to_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/data_types/serde/data_type_jsonb_serde.cpp:242:
   ```diff
   -                                               int end) const {
   +                                               int end) {
   ```
   
   be/src/vec/data_types/serde/data_type_jsonb_serde.h:67:
   ```diff
   -     Status write_column_to_pb(const IColumn& column, PValues& result, int 
start,
   -                               int end) const override;
   +     static Status write_column_to_pb(const IColumn& column, PValues& 
result, int start,
   +                               int end) override;
   ```
   



##########
be/src/vec/data_types/serde/data_type_ipv4_serde.cpp:
##########
@@ -88,5 +88,25 @@
     return Status::OK();
 }
 
+Status DataTypeIPv4SerDe::write_column_to_pb(const IColumn& column, PValues& 
result, int start,
+                                             int end) const {
+    const auto& column_data = assert_cast<const 
ColumnIPv4&>(column).get_data();
+    auto* ptype = result.mutable_type();
+    ptype->set_id(PGenericType::IPV4);
+    auto* values = result.mutable_uint32_value();
+    values->Reserve(end - start);
+    values->Add(column_data.begin() + start, column_data.begin() + end);
+    return Status::OK();
+}
+
+Status DataTypeIPv4SerDe::read_column_from_pb(IColumn& column, const PValues& 
arg) const {

Review Comment:
   warning: method 'read_column_from_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   Status DataTypeIPv4SerDe::read_column_from_pb(IColumn& column, const 
PValues& arg) {
   ```
   
   be/src/vec/data_types/serde/data_type_ipv4_serde.h:54:
   ```diff
   -     Status read_column_from_pb(IColumn& column, const PValues& arg) const 
override;
   +     static Status read_column_from_pb(IColumn& column, const PValues& arg) 
override;
   ```
   



##########
be/src/vec/data_types/serde/data_type_ipv4_serde.cpp:
##########
@@ -88,5 +88,25 @@ Status 
DataTypeIPv4SerDe::deserialize_one_cell_from_json(IColumn& column, Slice&
     return Status::OK();
 }
 
+Status DataTypeIPv4SerDe::write_column_to_pb(const IColumn& column, PValues& 
result, int start,

Review Comment:
   warning: method 'write_column_to_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/data_types/serde/data_type_ipv4_serde.cpp:91:
   ```diff
   -                                              int end) const {
   +                                              int end) {
   ```
   
   be/src/vec/data_types/serde/data_type_ipv4_serde.h:52:
   ```diff
   -     Status write_column_to_pb(const IColumn& column, PValues& result, int 
start,
   -                               int end) const override;
   +     static Status write_column_to_pb(const IColumn& column, PValues& 
result, int start,
   +                               int end) override;
   ```
   



##########
be/src/vec/data_types/serde/data_type_ipv6_serde.cpp:
##########
@@ -88,5 +89,28 @@
     return Status::OK();
 }
 
+Status DataTypeIPv6SerDe::write_column_to_pb(const IColumn& column, PValues& 
result, int start,
+                                             int end) const {
+    const auto& column_data = assert_cast<const ColumnIPv6&>(column);
+    result.mutable_bytes_value()->Reserve(end - start);
+    auto* ptype = result.mutable_type();
+    ptype->set_id(PGenericType::IPV6);
+    for (int i = start; i < end; ++i) {
+        const auto& val = column_data.get_data_at(i);
+        result.add_bytes_value(val.data, val.size);
+    }
+    return Status::OK();
+}
+
+Status DataTypeIPv6SerDe::read_column_from_pb(IColumn& column, const PValues& 
arg) const {

Review Comment:
   warning: method 'read_column_from_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   Status DataTypeIPv6SerDe::read_column_from_pb(IColumn& column, const 
PValues& arg) {
   ```
   
   be/src/vec/data_types/serde/data_type_ipv6_serde.h:57:
   ```diff
   -     Status read_column_from_pb(IColumn& column, const PValues& arg) const 
override;
   +     static Status read_column_from_pb(IColumn& column, const PValues& arg) 
override;
   ```
   



##########
be/src/vec/data_types/serde/data_type_ipv6_serde.cpp:
##########
@@ -88,5 +89,28 @@ Status 
DataTypeIPv6SerDe::deserialize_one_cell_from_json(IColumn& column, Slice&
     return Status::OK();
 }
 
+Status DataTypeIPv6SerDe::write_column_to_pb(const IColumn& column, PValues& 
result, int start,

Review Comment:
   warning: method 'write_column_to_pb' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/vec/data_types/serde/data_type_ipv6_serde.cpp:92:
   ```diff
   -                                              int end) const {
   +                                              int end) {
   ```
   
   be/src/vec/data_types/serde/data_type_ipv6_serde.h:55:
   ```diff
   -     Status write_column_to_pb(const IColumn& column, PValues& result, int 
start,
   -                               int end) const override;
   +     static Status write_column_to_pb(const IColumn& column, PValues& 
result, int start,
   +                               int end) override;
   ```
   



##########
be/test/vec/data_types/serde/data_type_serde_pb_test.cpp:
##########
@@ -28,53 +29,284 @@
 #include <string>
 #include <vector>
 
+#include "common/status.h"
 #include "gtest/gtest_pred_impl.h"
 #include "olap/hll.h"
 #include "util/bitmap_value.h"
 #include "util/quantile_state.h"
 #include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
 #include "vec/columns/column_complex.h"
 #include "vec/columns/column_decimal.h"
+#include "vec/columns/column_map.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
+#include "vec/columns/column_struct.h"
 #include "vec/columns/column_vector.h"
+#include "vec/columns/columns_number.h"
+#include "vec/core/block.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_bitmap.h"
 #include "vec/data_types/data_type_decimal.h"
 #include "vec/data_types/data_type_hll.h"
 #include "vec/data_types/data_type_ipv4.h"
 #include "vec/data_types/data_type_ipv6.h"
+#include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_number.h"
 #include "vec/data_types/data_type_quantilestate.h"
 #include "vec/data_types/data_type_string.h"
+#include "vec/data_types/data_type_struct.h"
+#include "vec/data_types/data_type_time_v2.h"
 #include "vec/data_types/serde/data_type_serde.h"
 
 namespace doris::vectorized {
 
 inline void column_to_pb(const DataTypePtr data_type, const IColumn& col, 
PValues* result) {
     const DataTypeSerDeSPtr serde = data_type->get_serde();
-    static_cast<void>(serde->write_column_to_pb(col, *result, 0, col.size()));
+    Status st = serde->write_column_to_pb(col, *result, 0, col.size());
+    if (!st.ok()) {
+        std::cerr << "column_to_pb error, maybe not impl it: " << st.msg() << 
" "
+                  << data_type->get_name() << std::endl;
+    }
 }
 
-inline void pb_to_column(const DataTypePtr data_type, PValues& result, 
IColumn& col) {
+inline bool pb_to_column(const DataTypePtr data_type, PValues& result, 
IColumn& col) {
     auto serde = data_type->get_serde();
-    static_cast<void>(serde->read_column_from_pb(col, result));
+    Status st = serde->read_column_from_pb(col, result);
+    if (!st.ok()) {
+        std::cerr << "pb_to_column error, maybe not impl it: " << st.msg() << 
" "
+                  << data_type->get_name() << std::endl;
+        return false;

Review Comment:
   warning: redundant boolean literal in conditional return statement 
[readability-simplify-boolean-expr]
   
   be/test/vec/data_types/serde/data_type_serde_pb_test.cpp:78:
   ```diff
   -     if (!st.ok()) {
   -         std::cerr << "pb_to_column error, maybe not impl it: " << st.msg() 
<< " "
   -                   << data_type->get_name() << std::endl;
   -         return false;
   -     }
   -     return true;
   +     return st.ok();
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to