github-actions[bot] commented on code in PR #32180:
URL: https://github.com/apache/doris/pull/32180#discussion_r1524641095
##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -39,7 +40,32 @@ class VRuntimeFilterSlots {
_runtime_filters(runtime_filters),
_need_local_merge(need_local_merge) {}
- Status init(RuntimeState* state, int64_t hash_table_size) {
+ Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool
publish_local) {
Review Comment:
warning: method 'send_filter_size' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status send_filter_size(RuntimeState* state, int64_t
hash_table_size, bool publish_local) {
```
##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -39,7 +40,32 @@
_runtime_filters(runtime_filters),
_need_local_merge(need_local_merge) {}
- Status init(RuntimeState* state, int64_t hash_table_size) {
+ Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool
publish_local) {
+ const auto max_in_num = state->runtime_filter_max_in_num();
+ const bool over_max_in_num = (hash_table_size >= max_in_num);
+ for (auto* runtime_filter : _runtime_filters) {
+ if (over_max_in_num &&
+ runtime_filter->type() ==
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+ }
+
+ if (runtime_filter->is_bloomfilter() && !publish_local &&
+ runtime_filter->need_sync_filter_size()) {
+
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+ }
+ }
+ return Status::OK();
+ }
+
+ void init_dependency(pipeline::CountedFinishDependency* dependency) {
+ for (auto* runtime_filter : _runtime_filters) {
+ if (runtime_filter->is_bloomfilter() &&
runtime_filter->need_sync_filter_size()) {
+ runtime_filter->set_dependency(dependency);
+ }
+ }
+ }
+
+ Status init(RuntimeState* state, int64_t hash_table_size, bool
publish_local) {
Review Comment:
warning: function 'init' exceeds recommended size/complexity thresholds
[readability-function-size]
```cpp
Status init(RuntimeState* state, int64_t hash_table_size, bool
publish_local) {
^
```
<details>
<summary>Additional context</summary>
**be/src/exprs/runtime_filter_slots.h:67:** 124 lines including whitespace
and comments (threshold 80)
```cpp
Status init(RuntimeState* state, int64_t hash_table_size, bool
publish_local) {
^
```
</details>
##########
be/src/exprs/runtime_filter_slots.h:
##########
@@ -39,7 +40,32 @@
_runtime_filters(runtime_filters),
_need_local_merge(need_local_merge) {}
- Status init(RuntimeState* state, int64_t hash_table_size) {
+ Status send_filter_size(RuntimeState* state, int64_t hash_table_size, bool
publish_local) {
+ const auto max_in_num = state->runtime_filter_max_in_num();
+ const bool over_max_in_num = (hash_table_size >= max_in_num);
+ for (auto* runtime_filter : _runtime_filters) {
+ if (over_max_in_num &&
+ runtime_filter->type() ==
RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ RETURN_IF_ERROR(runtime_filter->change_to_bloom_filter());
+ }
+
+ if (runtime_filter->is_bloomfilter() && !publish_local &&
+ runtime_filter->need_sync_filter_size()) {
+
RETURN_IF_ERROR(runtime_filter->send_filter_size(hash_table_size));
+ }
+ }
+ return Status::OK();
+ }
+
+ void init_dependency(pipeline::CountedFinishDependency* dependency) {
+ for (auto* runtime_filter : _runtime_filters) {
+ if (runtime_filter->is_bloomfilter() &&
runtime_filter->need_sync_filter_size()) {
+ runtime_filter->set_dependency(dependency);
+ }
+ }
+ }
+
+ Status init(RuntimeState* state, int64_t hash_table_size, bool
publish_local) {
Review Comment:
warning: method 'init' can be made const
[readability-make-member-function-const]
```suggestion
Status init(RuntimeState* state, int64_t hash_table_size, bool
publish_local) const {
```
##########
be/src/runtime/runtime_filter_mgr.h:
##########
@@ -20,8 +20,10 @@
#include <gen_cpp/PaloInternalService_types.h>
Review Comment:
warning: 'gen_cpp/PaloInternalService_types.h' file not found
[clang-diagnostic-error]
```cpp
#include <gen_cpp/PaloInternalService_types.h>
^
```
##########
be/src/runtime/runtime_filter_mgr.cpp:
##########
@@ -303,6 +305,69 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId
query_id,
return Status::OK();
}
+Status RuntimeFilterMergeControllerEntity::send_filter_size(const
PSendFilterSizeRequest* request) {
Review Comment:
warning: method 'send_filter_size' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status RuntimeFilterMergeControllerEntity::send_filter_size(const
PSendFilterSizeRequest* request) {
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1357,71 +1357,117 @@
bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
int64_t start_apply = MonotonicMillis();
+ std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+ std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+ RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+ ObjectPool* pool = nullptr;
+
const auto& fragment_instance_ids = request->fragment_instance_ids();
- if (!fragment_instance_ids.empty()) {
- UniqueId fragment_instance_id = fragment_instance_ids[0];
- TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
- std::shared_ptr<PlanFragmentExecutor> fragment_executor;
- std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-
- RuntimeFilterMgr* runtime_filter_mgr = nullptr;
- ObjectPool* pool = nullptr;
- if (is_pipeline) {
- std::unique_lock<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find(tfragment_instance_id);
- if (iter == _pipeline_map.end()) {
- VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
- return Status::InvalidArgument("fragment-id: {}",
fragment_instance_id.to_string());
- }
- pip_context = iter->second;
+ {
+ std::unique_lock<std::mutex> lock(_lock);
+ for (UniqueId fragment_instance_id : fragment_instance_ids) {
+ TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
- DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
- pool = &pip_context->get_query_ctx()->obj_pool;
- } else {
- std::unique_lock<std::mutex> lock(_lock);
- auto iter = _fragment_instance_map.find(tfragment_instance_id);
- if (iter == _fragment_instance_map.end()) {
- VLOG_CRITICAL << "unknown.... fragment instance id:"
- << print_id(tfragment_instance_id);
- return Status::InvalidArgument("fragment instance id: {}",
-
print_id(tfragment_instance_id));
- }
- fragment_executor = iter->second;
+ if (is_pipeline) {
+ auto iter = _pipeline_map.find(tfragment_instance_id);
+ if (iter == _pipeline_map.end()) {
+ continue;
+ }
+ pip_context = iter->second;
- DCHECK(fragment_executor != nullptr);
- runtime_filter_mgr =
fragment_executor->get_query_ctx()->runtime_filter_mgr();
- pool = &fragment_executor->get_query_ctx()->obj_pool;
- }
+ DCHECK(pip_context != nullptr);
+ runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
+ pool = &pip_context->get_query_ctx()->obj_pool;
+ } else {
+ auto iter = _fragment_instance_map.find(tfragment_instance_id);
+ if (iter == _fragment_instance_map.end()) {
+ continue;
+ }
+ fragment_executor = iter->second;
- // 1. get the target filters
- std::vector<IRuntimeFilter*> filters;
-
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(),
filters));
-
- // 2. create the filter wrapper to replace or ignore the target filters
- if (request->has_in_filter() &&
request->in_filter().has_ignored_msg()) {
- const auto& in_filter = request->in_filter();
-
- std::ranges::for_each(filters, [&in_filter](auto& filter) {
- filter->set_ignored(in_filter.ignored_msg());
- filter->signal();
- });
- } else if (!filters.empty()) {
- UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
- filters[0]->column_type()};
- RuntimePredicateWrapper* filter_wrapper = nullptr;
- RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms,
&filter_wrapper));
-
- std::ranges::for_each(filters, [&](auto& filter) {
- filter->update_filter(filter_wrapper, request->merge_time(),
start_apply);
- });
+ DCHECK(fragment_executor != nullptr);
+ runtime_filter_mgr =
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+ pool = &fragment_executor->get_query_ctx()->obj_pool;
+ }
+ break;
}
}
+ if (runtime_filter_mgr == nullptr) {
+ // all instance finished
+ return Status::OK();
+ }
+
+ // 1. get the target filters
+ std::vector<IRuntimeFilter*> filters;
+
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(),
filters));
+
+ // 2. create the filter wrapper to replace or ignore the target filters
+ if (request->has_in_filter() && request->in_filter().has_ignored_msg()) {
+ const auto& in_filter = request->in_filter();
+
+ std::ranges::for_each(filters, [&in_filter](auto& filter) {
+ filter->set_ignored(in_filter.ignored_msg());
+ filter->signal();
+ });
+ } else if (!filters.empty()) {
+ UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
filters[0]->column_type()};
+ RuntimePredicateWrapper* filter_wrapper = nullptr;
+ RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms,
&filter_wrapper));
+
+ std::ranges::for_each(filters, [&](auto& filter) {
+ filter->update_filter(filter_wrapper, request->merge_time(),
start_apply);
+ });
+ }
+
return Status::OK();
}
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
+ UniqueId queryid = request->query_id();
+ std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
+ RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid,
&filter_controller));
+
+ std::shared_ptr<QueryContext> query_ctx;
+ {
+ TUniqueId query_id;
+ query_id.__set_hi(queryid.hi);
+ query_id.__set_lo(queryid.lo);
+ std::lock_guard<std::mutex> lock(_lock);
+ auto iter = _query_ctx_map.find(query_id);
+ if (iter == _query_ctx_map.end()) {
+ return Status::InvalidArgument("query-id: {}",
queryid.to_string());
+ }
+
+ query_ctx = iter->second;
+ }
+ auto merge_status = filter_controller->send_filter_size(request);
+ return merge_status;
+}
+
+Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
Review Comment:
warning: method 'sync_filter_size' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest*
request) {
```
##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1357,71 +1357,117 @@ Status FragmentMgr::apply_filterv2(const
PPublishFilterRequestV2* request,
bool is_pipeline = request->has_is_pipeline() && request->is_pipeline();
int64_t start_apply = MonotonicMillis();
+ std::shared_ptr<PlanFragmentExecutor> fragment_executor;
+ std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
+
+ RuntimeFilterMgr* runtime_filter_mgr = nullptr;
+ ObjectPool* pool = nullptr;
+
const auto& fragment_instance_ids = request->fragment_instance_ids();
- if (!fragment_instance_ids.empty()) {
- UniqueId fragment_instance_id = fragment_instance_ids[0];
- TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
-
- std::shared_ptr<PlanFragmentExecutor> fragment_executor;
- std::shared_ptr<pipeline::PipelineFragmentContext> pip_context;
-
- RuntimeFilterMgr* runtime_filter_mgr = nullptr;
- ObjectPool* pool = nullptr;
- if (is_pipeline) {
- std::unique_lock<std::mutex> lock(_lock);
- auto iter = _pipeline_map.find(tfragment_instance_id);
- if (iter == _pipeline_map.end()) {
- VLOG_CRITICAL << "unknown.... fragment-id:" <<
fragment_instance_id;
- return Status::InvalidArgument("fragment-id: {}",
fragment_instance_id.to_string());
- }
- pip_context = iter->second;
+ {
+ std::unique_lock<std::mutex> lock(_lock);
+ for (UniqueId fragment_instance_id : fragment_instance_ids) {
+ TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift();
- DCHECK(pip_context != nullptr);
- runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
- pool = &pip_context->get_query_ctx()->obj_pool;
- } else {
- std::unique_lock<std::mutex> lock(_lock);
- auto iter = _fragment_instance_map.find(tfragment_instance_id);
- if (iter == _fragment_instance_map.end()) {
- VLOG_CRITICAL << "unknown.... fragment instance id:"
- << print_id(tfragment_instance_id);
- return Status::InvalidArgument("fragment instance id: {}",
-
print_id(tfragment_instance_id));
- }
- fragment_executor = iter->second;
+ if (is_pipeline) {
+ auto iter = _pipeline_map.find(tfragment_instance_id);
+ if (iter == _pipeline_map.end()) {
+ continue;
+ }
+ pip_context = iter->second;
- DCHECK(fragment_executor != nullptr);
- runtime_filter_mgr =
fragment_executor->get_query_ctx()->runtime_filter_mgr();
- pool = &fragment_executor->get_query_ctx()->obj_pool;
- }
+ DCHECK(pip_context != nullptr);
+ runtime_filter_mgr =
pip_context->get_query_ctx()->runtime_filter_mgr();
+ pool = &pip_context->get_query_ctx()->obj_pool;
+ } else {
+ auto iter = _fragment_instance_map.find(tfragment_instance_id);
+ if (iter == _fragment_instance_map.end()) {
+ continue;
+ }
+ fragment_executor = iter->second;
- // 1. get the target filters
- std::vector<IRuntimeFilter*> filters;
-
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(),
filters));
-
- // 2. create the filter wrapper to replace or ignore the target filters
- if (request->has_in_filter() &&
request->in_filter().has_ignored_msg()) {
- const auto& in_filter = request->in_filter();
-
- std::ranges::for_each(filters, [&in_filter](auto& filter) {
- filter->set_ignored(in_filter.ignored_msg());
- filter->signal();
- });
- } else if (!filters.empty()) {
- UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
- filters[0]->column_type()};
- RuntimePredicateWrapper* filter_wrapper = nullptr;
- RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms,
&filter_wrapper));
-
- std::ranges::for_each(filters, [&](auto& filter) {
- filter->update_filter(filter_wrapper, request->merge_time(),
start_apply);
- });
+ DCHECK(fragment_executor != nullptr);
+ runtime_filter_mgr =
fragment_executor->get_query_ctx()->runtime_filter_mgr();
+ pool = &fragment_executor->get_query_ctx()->obj_pool;
+ }
+ break;
}
}
+ if (runtime_filter_mgr == nullptr) {
+ // all instance finished
+ return Status::OK();
+ }
+
+ // 1. get the target filters
+ std::vector<IRuntimeFilter*> filters;
+
RETURN_IF_ERROR(runtime_filter_mgr->get_consume_filters(request->filter_id(),
filters));
+
+ // 2. create the filter wrapper to replace or ignore the target filters
+ if (request->has_in_filter() && request->in_filter().has_ignored_msg()) {
+ const auto& in_filter = request->in_filter();
+
+ std::ranges::for_each(filters, [&in_filter](auto& filter) {
+ filter->set_ignored(in_filter.ignored_msg());
+ filter->signal();
+ });
+ } else if (!filters.empty()) {
+ UpdateRuntimeFilterParamsV2 params {request, attach_data, pool,
filters[0]->column_type()};
+ RuntimePredicateWrapper* filter_wrapper = nullptr;
+ RETURN_IF_ERROR(IRuntimeFilter::create_wrapper(¶ms,
&filter_wrapper));
+
+ std::ranges::for_each(filters, [&](auto& filter) {
+ filter->update_filter(filter_wrapper, request->merge_time(),
start_apply);
+ });
+ }
+
return Status::OK();
}
+Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
Review Comment:
warning: method 'send_filter_size' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest*
request) {
```
##########
be/src/runtime/runtime_filter_mgr.h:
##########
@@ -79,6 +84,15 @@
Status get_consume_filters(const int filter_id,
std::vector<IRuntimeFilter*>& consumer_filters);
+ IRuntimeFilter* try_get_product_filter(const int filter_id) {
Review Comment:
warning: method 'try_get_product_filter' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static IRuntimeFilter* try_get_product_filter(const int filter_id) {
```
##########
be/src/service/internal_service.cpp:
##########
@@ -1167,9 +1167,36 @@
auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "merge meet error" << st.to_string();
- }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ offer_failed(response, done, _light_work_pool);
+ return;
+ }
+}
+
+void PInternalService::send_filter_size(::google::protobuf::RpcController*
controller,
+ const ::doris::PSendFilterSizeRequest*
request,
+ ::doris::PSendFilterSizeResponse*
response,
+ ::google::protobuf::Closure* done) {
+ bool ret = _light_work_pool.try_offer([this, request, response, done]() {
+ brpc::ClosureGuard closure_guard(done);
+ Status st = _exec_env->fragment_mgr()->send_filter_size(request);
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ offer_failed(response, done, _light_work_pool);
+ return;
+ }
+}
+
+void PInternalService::sync_filter_size(::google::protobuf::RpcController*
controller,
Review Comment:
warning: method 'sync_filter_size' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void
PInternalService::sync_filter_size(::google::protobuf::RpcController*
controller,
```
##########
be/src/service/internal_service.cpp:
##########
@@ -1167,9 +1167,36 @@ void
PInternalService::merge_filter(::google::protobuf::RpcController* controlle
auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
Status st = _exec_env->fragment_mgr()->merge_filter(request,
&zero_copy_input_stream);
- if (!st.ok()) {
- LOG(WARNING) << "merge meet error" << st.to_string();
- }
+ st.to_protobuf(response->mutable_status());
+ });
+ if (!ret) {
+ offer_failed(response, done, _light_work_pool);
+ return;
+ }
+}
+
+void PInternalService::send_filter_size(::google::protobuf::RpcController*
controller,
Review Comment:
warning: method 'send_filter_size' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void
PInternalService::send_filter_size(::google::protobuf::RpcController*
controller,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]