This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 76021471269 [Chore](runtime-filter) display 
wake_up_by_downstream/should_build_hash_table when build_sink… (#44892)
76021471269 is described below

commit 76021471269a8590ef4780995f8159afbee7fa6d
Author: Pxl <[email protected]>
AuthorDate: Tue Dec 3 17:50:48 2024 +0800

    [Chore](runtime-filter) display 
wake_up_by_downstream/should_build_hash_table when build_sink… (#44892)
    
    display wake_up_by_downstream/should_build_hash_table when 
build_sink::close prosess rf meet error
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp | 63 +++++++++++++++-------------
 1 file changed, 35 insertions(+), 28 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e4c67b24193..d6129e5af20 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -139,36 +139,43 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
         return Base::close(state, exec_status);
     }
 
-    if (state->get_task()->wake_up_by_downstream()) {
-        if (_should_build_hash_table) {
-            // partitial ignore rf to make global rf work
-            RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
-            RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
-        } else {
-            // do not publish filter coz local rf not inited and useless
-            return Base::close(state, exec_status);
-        }
-    } else if (_should_build_hash_table) {
-        if (p._shared_hashtable_controller && 
!p._shared_hash_table_context->complete_build_stage) {
-            return Status::InternalError("close before sink meet eos");
-        }
-        auto* block = _shared_state->build_block.get();
-        uint64_t hash_table_size = block ? block->rows() : 0;
-        {
-            SCOPED_TIMER(_runtime_filter_init_timer);
-            RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
-                    _runtime_filter_slots->init_filters(state, 
hash_table_size));
-            RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
-        }
-        if (hash_table_size > 1) {
-            SCOPED_TIMER(_runtime_filter_compute_timer);
-            _runtime_filter_slots->insert(block);
+    try {
+        if (state->get_task()->wake_up_by_downstream()) {
+            if (_should_build_hash_table) {
+                // partitial ignore rf to make global rf work
+                RETURN_IF_ERROR(
+                        _runtime_filter_slots->send_filter_size(state, 0, 
_finish_dependency));
+                RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
+            } else {
+                // do not publish filter coz local rf not inited and useless
+                return Base::close(state, exec_status);
+            }
+        } else if (_should_build_hash_table) {
+            if (p._shared_hashtable_controller &&
+                !p._shared_hash_table_context->complete_build_stage) {
+                return Status::InternalError("close before sink meet eos");
+            }
+            auto* block = _shared_state->build_block.get();
+            uint64_t hash_table_size = block ? block->rows() : 0;
+            {
+                SCOPED_TIMER(_runtime_filter_init_timer);
+                RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, 
hash_table_size));
+                RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
+            }
+            if (hash_table_size > 1) {
+                SCOPED_TIMER(_runtime_filter_compute_timer);
+                _runtime_filter_slots->insert(block);
+            }
         }
-    }
 
-    SCOPED_TIMER(_publish_runtime_filter_timer);
-    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
-            _runtime_filter_slots->publish(state, !_should_build_hash_table));
+        SCOPED_TIMER(_publish_runtime_filter_timer);
+        RETURN_IF_ERROR(_runtime_filter_slots->publish(state, 
!_should_build_hash_table));
+    } catch (Exception& e) {
+        return Status::InternalError(
+                "rf process meet error: {}, wake_up_by_downstream: {}, 
should_build_hash_table: {}",
+                e.to_string(), state->get_task()->wake_up_by_downstream(),
+                _should_build_hash_table);
+    }
     return Base::close(state, exec_status);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to