This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 217e6cef340  [Bug](join) return eof when join build sink awakend by 
downstream source #47380  (#47709)
217e6cef340 is described below

commit 217e6cef340cbb26c05f01557aff67e8ae338fa2
Author: Pxl <[email protected]>
AuthorDate: Wed Feb 12 09:41:11 2025 +0800

     [Bug](join) return eof when join build sink awakend by downstream source 
#47380  (#47709)
    
    pick from #47380
---
 be/src/exprs/runtime_filter_slots.h               |  3 +++
 be/src/pipeline/exec/hashjoin_build_sink.cpp      | 11 +++--------
 be/src/vec/runtime/shared_hash_table_controller.h |  1 -
 3 files changed, 6 insertions(+), 9 deletions(-)

diff --git a/be/src/exprs/runtime_filter_slots.h 
b/be/src/exprs/runtime_filter_slots.h
index c06ac946283..e191ee4d070 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -125,6 +125,9 @@ public:
     Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
         // process IN_OR_BLOOM_FILTER's real type
         for (auto* filter : _runtime_filters) {
+            if (filter->get_ignored() || filter->get_disabled()) {
+                continue;
+            }
             if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
                 get_real_size(filter, local_hash_table_size) > 
state->runtime_filter_max_in_num()) {
                 RETURN_IF_ERROR(filter->change_to_bloom_filter());
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index c764b8d1a73..d47c6f445d6 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -174,20 +174,17 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         SCOPED_TIMER(_publish_runtime_filter_timer);
         
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
     } catch (Exception& e) {
-        bool blocked_by_complete_build_stage = p._shared_hashtable_controller 
&&
-                                               
!p._shared_hash_table_context->complete_build_stage;
         bool blocked_by_shared_hash_table_signal = !_should_build_hash_table &&
                                                    
p._shared_hashtable_controller &&
                                                    
!p._shared_hash_table_context->signaled;
 
         return Status::InternalError(
                 "rf process meet error: {}, wake_up_early: {}, 
should_build_hash_table: "
-                "{}, _finish_dependency: {}, blocked_by_complete_build_stage: 
{}, "
+                "{}, _finish_dependency: {},"
                 "blocked_by_shared_hash_table_signal: "
                 "{}",
                 e.to_string(), state->get_task()->wake_up_early(), 
_should_build_hash_table,
-                _finish_dependency->debug_string(), 
blocked_by_complete_build_stage,
-                blocked_by_shared_hash_table_signal);
+                _finish_dependency->debug_string(), 
blocked_by_shared_hash_table_signal);
     }
     return Base::close(state, exec_status);
 }
@@ -653,7 +650,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
                 local_state.process_build_block(state, 
(*local_state._shared_state->build_block)));
         if (_shared_hashtable_controller) {
             _shared_hash_table_context->status = Status::OK();
-            _shared_hash_table_context->complete_build_stage = true;
             // arena will be shared with other instances.
             _shared_hash_table_context->arena = 
local_state._shared_state->arena;
             _shared_hash_table_context->hash_table_variants =
@@ -666,8 +662,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* 
state, vectorized::Block*
             
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
             _shared_hashtable_controller->signal(node_id());
         }
-    } else if (!local_state._should_build_hash_table &&
-               _shared_hash_table_context->complete_build_stage) {
+    } else if (!local_state._should_build_hash_table) {
         DCHECK(_shared_hashtable_controller != nullptr);
         DCHECK(_shared_hash_table_context != nullptr);
         // the instance which is not build hash table, it's should wait the 
signal of hash table build finished.
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h 
b/be/src/vec/runtime/shared_hash_table_controller.h
index ea26333a3aa..da60bb2410c 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -68,7 +68,6 @@ struct SharedHashTableContext {
     std::map<int, RuntimeFilterContextSPtr> runtime_filters;
     std::atomic<bool> signaled = false;
     bool short_circuit_for_null_in_probe_side = false;
-    std::atomic<bool> complete_build_stage = false;
 };
 
 using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to