This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 217e6cef340 [Bug](join) return eof when join build sink awakend by
downstream source #47380 (#47709)
217e6cef340 is described below
commit 217e6cef340cbb26c05f01557aff67e8ae338fa2
Author: Pxl <[email protected]>
AuthorDate: Wed Feb 12 09:41:11 2025 +0800
[Bug](join) return eof when join build sink awakend by downstream source
#47380 (#47709)
pick from #47380
---
be/src/exprs/runtime_filter_slots.h | 3 +++
be/src/pipeline/exec/hashjoin_build_sink.cpp | 11 +++--------
be/src/vec/runtime/shared_hash_table_controller.h | 1 -
3 files changed, 6 insertions(+), 9 deletions(-)
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index c06ac946283..e191ee4d070 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -125,6 +125,9 @@ public:
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
// process IN_OR_BLOOM_FILTER's real type
for (auto* filter : _runtime_filters) {
+ if (filter->get_ignored() || filter->get_disabled()) {
+ continue;
+ }
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
get_real_size(filter, local_hash_table_size) >
state->runtime_filter_max_in_num()) {
RETURN_IF_ERROR(filter->change_to_bloom_filter());
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index c764b8d1a73..d47c6f445d6 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -174,20 +174,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
} catch (Exception& e) {
- bool blocked_by_complete_build_stage = p._shared_hashtable_controller
&&
-
!p._shared_hash_table_context->complete_build_stage;
bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled;
return Status::InternalError(
"rf process meet error: {}, wake_up_early: {},
should_build_hash_table: "
- "{}, _finish_dependency: {}, blocked_by_complete_build_stage:
{}, "
+ "{}, _finish_dependency: {},"
"blocked_by_shared_hash_table_signal: "
"{}",
e.to_string(), state->get_task()->wake_up_early(),
_should_build_hash_table,
- _finish_dependency->debug_string(),
blocked_by_complete_build_stage,
- blocked_by_shared_hash_table_signal);
+ _finish_dependency->debug_string(),
blocked_by_shared_hash_table_signal);
}
return Base::close(state, exec_status);
}
@@ -653,7 +650,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state.process_build_block(state,
(*local_state._shared_state->build_block)));
if (_shared_hashtable_controller) {
_shared_hash_table_context->status = Status::OK();
- _shared_hash_table_context->complete_build_stage = true;
// arena will be shared with other instances.
_shared_hash_table_context->arena =
local_state._shared_state->arena;
_shared_hash_table_context->hash_table_variants =
@@ -666,8 +662,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
_shared_hashtable_controller->signal(node_id());
}
- } else if (!local_state._should_build_hash_table &&
- _shared_hash_table_context->complete_build_stage) {
+ } else if (!local_state._should_build_hash_table) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
// the instance which is not build hash table, it's should wait the
signal of hash table build finished.
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index ea26333a3aa..da60bb2410c 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -68,7 +68,6 @@ struct SharedHashTableContext {
std::map<int, RuntimeFilterContextSPtr> runtime_filters;
std::atomic<bool> signaled = false;
bool short_circuit_for_null_in_probe_side = false;
- std::atomic<bool> complete_build_stage = false;
};
using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]