This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 76021471269 [Chore](runtime-filter) display
wake_up_by_downstream/should_build_hash_table when build_sink… (#44892)
76021471269 is described below
commit 76021471269a8590ef4780995f8159afbee7fa6d
Author: Pxl <[email protected]>
AuthorDate: Tue Dec 3 17:50:48 2024 +0800
[Chore](runtime-filter) display
wake_up_by_downstream/should_build_hash_table when build_sink… (#44892)
display wake_up_by_downstream/should_build_hash_table when
build_sink::close prosess rf meet error
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 63 +++++++++++++++-------------
1 file changed, 35 insertions(+), 28 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e4c67b24193..d6129e5af20 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -139,36 +139,43 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
return Base::close(state, exec_status);
}
- if (state->get_task()->wake_up_by_downstream()) {
- if (_should_build_hash_table) {
- // partitial ignore rf to make global rf work
- RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0,
_finish_dependency));
- RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
- } else {
- // do not publish filter coz local rf not inited and useless
- return Base::close(state, exec_status);
- }
- } else if (_should_build_hash_table) {
- if (p._shared_hashtable_controller &&
!p._shared_hash_table_context->complete_build_stage) {
- return Status::InternalError("close before sink meet eos");
- }
- auto* block = _shared_state->build_block.get();
- uint64_t hash_table_size = block ? block->rows() : 0;
- {
- SCOPED_TIMER(_runtime_filter_init_timer);
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_filter_slots->init_filters(state,
hash_table_size));
- RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
- }
- if (hash_table_size > 1) {
- SCOPED_TIMER(_runtime_filter_compute_timer);
- _runtime_filter_slots->insert(block);
+ try {
+ if (state->get_task()->wake_up_by_downstream()) {
+ if (_should_build_hash_table) {
+ // partitial ignore rf to make global rf work
+ RETURN_IF_ERROR(
+ _runtime_filter_slots->send_filter_size(state, 0,
_finish_dependency));
+ RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
+ } else {
+ // do not publish filter coz local rf not inited and useless
+ return Base::close(state, exec_status);
+ }
+ } else if (_should_build_hash_table) {
+ if (p._shared_hashtable_controller &&
+ !p._shared_hash_table_context->complete_build_stage) {
+ return Status::InternalError("close before sink meet eos");
+ }
+ auto* block = _shared_state->build_block.get();
+ uint64_t hash_table_size = block ? block->rows() : 0;
+ {
+ SCOPED_TIMER(_runtime_filter_init_timer);
+ RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state,
hash_table_size));
+ RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+ }
+ if (hash_table_size > 1) {
+ SCOPED_TIMER(_runtime_filter_compute_timer);
+ _runtime_filter_slots->insert(block);
+ }
}
- }
- SCOPED_TIMER(_publish_runtime_filter_timer);
- RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
- _runtime_filter_slots->publish(state, !_should_build_hash_table));
+ SCOPED_TIMER(_publish_runtime_filter_timer);
+ RETURN_IF_ERROR(_runtime_filter_slots->publish(state,
!_should_build_hash_table));
+ } catch (Exception& e) {
+ return Status::InternalError(
+ "rf process meet error: {}, wake_up_by_downstream: {},
should_build_hash_table: {}",
+ e.to_string(), state->get_task()->wake_up_by_downstream(),
+ _should_build_hash_table);
+ }
return Base::close(state, exec_status);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]