This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7e0b808622d0aa4ae73f9791566a1e438ce8737d Author: Pxl <[email protected]> AuthorDate: Wed Apr 17 11:11:03 2024 +0800 [Improvement](runtime-filter) make sync rf size work when need_local_merge (#33717) make sync rf size work when need_local_merge --- be/src/exprs/runtime_filter.cpp | 5 +++-- be/src/exprs/runtime_filter_slots.h | 4 ++-- be/src/pipeline/exec/hashjoin_build_sink.cpp | 4 +--- be/src/runtime/runtime_state.cpp | 4 ++-- 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 50105d56068..174ce8f3fe5 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1309,9 +1309,10 @@ void IRuntimeFilter::set_dependency(pipeline::CountedFinishDependency* dependenc } void IRuntimeFilter::set_synced_size(uint64_t global_size) { - CHECK(_dependency); _synced_size = global_size; - _dependency->sub(); + if (_dependency) { + _dependency->sub(); + } } void IRuntimeFilter::set_ignored() { diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 2a44c6a3745..8d865dec987 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -41,9 +41,9 @@ public: } } - Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, bool publish_local, + Status send_filter_size(RuntimeState* state, uint64_t hash_table_size, pipeline::CountedFinishDependency* dependency) { - if (_runtime_filters.empty() || publish_local) { + if (_runtime_filters.empty()) { return Status::OK(); } for (auto* runtime_filter : _runtime_filters) { diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index d4dc1956400..f0ff99f0e3d 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -537,10 +537,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* local_state._shared_state->build_block = std::make_shared<vectorized::Block>( local_state._build_side_mutable_block.to_block()); - const bool need_local_merge = - local_state._parent->cast<HashJoinBuildSinkOperatorX>()._need_local_merge; RETURN_IF_ERROR(local_state._runtime_filter_slots->send_filter_size( - state, local_state._shared_state->build_block->rows(), need_local_merge, + state, local_state._shared_state->build_block->rows(), (CountedFinishDependency*)(local_state._finish_dependency.get()))); RETURN_IF_ERROR( local_state.process_build_block(state, (*local_state._shared_state->build_block))); diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 2d9d939186d..aab31fa02ea 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -536,10 +536,10 @@ Status RuntimeState::register_producer_runtime_filter(const doris::TRuntimeFilte // So if `need_local_merge` is true, we will disable `build_bf_exactly`. if (desc.has_remote_targets || need_local_merge) { return global_runtime_filter_mgr()->register_local_merge_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly && !need_local_merge); + desc, query_options(), producer_filter, build_bf_exactly); } else { return local_runtime_filter_mgr()->register_producer_filter( - desc, query_options(), producer_filter, build_bf_exactly && !need_local_merge); + desc, query_options(), producer_filter, build_bf_exactly); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
