HappenLee commented on code in PR #32180:
URL: https://github.com/apache/doris/pull/32180#discussion_r1546019315


##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -34,130 +34,75 @@ class VRuntimeFilterSlots {
 public:
     VRuntimeFilterSlots(
             const std::vector<std::shared_ptr<vectorized::VExprContext>>& 
build_expr_ctxs,
-            const std::vector<IRuntimeFilter*>& runtime_filters, bool 
need_local_merge = false)
-            : _build_expr_context(build_expr_ctxs),
-              _runtime_filters(runtime_filters),
-              _need_local_merge(need_local_merge) {}
-
-    Status init(RuntimeState* state, int64_t hash_table_size) {
-        // runtime filter effect strategy
-        // 1. we will ignore IN filter when hash_table_size is too big
-        // 2. we will ignore BLOOM filter and MinMax filter when 
hash_table_size
-        // is too small and IN filter has effect
-        std::map<int, bool> has_in_filter;
-
-        auto ignore_local_filter = [&](int filter_id) {
-            auto runtime_filter_mgr = _need_local_merge ? 
state->global_runtime_filter_mgr()
-                                                        : 
state->local_runtime_filter_mgr();
-
-            std::vector<IRuntimeFilter*> filters;
-            RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(filter_id, 
filters));
-            if (filters.empty()) {
-                throw Exception(ErrorCode::INTERNAL_ERROR, "filters empty, 
filter_id={}",
-                                filter_id);
-            }
-            for (auto* filter : filters) {
-                filter->set_ignored("");
-                filter->signal();
-            }
-            return Status::OK();
-        };
+            const std::vector<IRuntimeFilter*>& runtime_filters)
+            : _build_expr_context(build_expr_ctxs), 
_runtime_filters(runtime_filters) {
+        for (auto* runtime_filter : _runtime_filters) {
+            
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter);
+        }
+    }
 
-        auto ignore_remote_filter = [](IRuntimeFilter* runtime_filter, 
std::string& msg) {
-            runtime_filter->set_ignored(msg);
-            RETURN_IF_ERROR(runtime_filter->publish());
+    Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, 
bool publish_local,
+                            pipeline::CountedFinishDependency* dependency) {
+        if (_runtime_filters.empty() || publish_local) {
             return Status::OK();
-        };
-
-        // ordered vector: IN, IN_OR_BLOOM, others.
-        // so we can ignore other filter if IN Predicate exists.
-        auto compare_desc = [](IRuntimeFilter* d1, IRuntimeFilter* d2) {
-            if (d1->type() == d2->type()) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_FILTER) {
-                return false;
-            } else if (d1->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return true;
-            } else if (d2->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                return false;
-            } else {
-                return d1->type() < d2->type();
+        }
+        for (auto* runtime_filter : _runtime_filters) {
+            if (runtime_filter->need_sync_filter_size()) {
+                runtime_filter->set_dependency(dependency);
             }
-        };
-        std::sort(_runtime_filters.begin(), _runtime_filters.end(), 
compare_desc);
-
-        // do not create 'in filter' when hash_table size over limit
-        const auto max_in_num = state->runtime_filter_max_in_num();
-        const bool over_max_in_num = (hash_table_size >= max_in_num);
+        }
 
+        // send_filter_size may call dependency->sub(), so we call 
set_dependency firstly for all rf to avoid dependency set_ready repeatedly
         for (auto* runtime_filter : _runtime_filters) {
-            if (runtime_filter->expr_order() < 0 ||
-                runtime_filter->expr_order() >= _build_expr_context.size()) {
-                return Status::InternalError(
-                        "runtime_filter meet invalid expr_order, 
expr_order={}, "
-                        "_build_expr_context.size={}",
-                        runtime_filter->expr_order(), 
_build_expr_context.size());
+            if (runtime_filter->need_sync_filter_size()) {
+                
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
             }
+        }
+        return Status::OK();
+    }
 
-            bool is_in_filter = (runtime_filter->type() == 
RuntimeFilterType::IN_FILTER);
+    static uint64_t get_real_size(IRuntimeFilter* runtime_filter, uint64_t 
hash_table_size) {
+        return runtime_filter->isset_global_size() ? 
runtime_filter->get_global_size()
+                                                   : hash_table_size;
+    }
 
-            if (over_max_in_num &&
-                runtime_filter->type() == 
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
-                RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+    Status ignore_filters(RuntimeState* state, uint64_t hash_table_size) {

Review Comment:
   the param `hash_table_size` useless



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to