This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b5cbf64afd [Chore](join) add debug info for build_sink::close (#44974)
5b5cbf64afd is described below

commit 5b5cbf64afdb576dcb02ec30a0be477b604c27aa
Author: Pxl <[email protected]>
AuthorDate: Wed Dec 4 18:24:23 2024 +0800

    [Chore](join) add debug info for build_sink::close (#44974)
    
    add debug info for build_sink::close
---
 be/src/exprs/runtime_filter.cpp              |  2 +-
 be/src/pipeline/dependency.cpp               |  9 +++++----
 be/src/pipeline/dependency.h                 | 22 +++++++++++++++++++---
 be/src/pipeline/exec/hashjoin_build_sink.cpp | 13 ++++++++++---
 4 files changed, 35 insertions(+), 11 deletions(-)

diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 606df9fe980..34dfb8ff8ba 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1532,7 +1532,7 @@ std::string IRuntimeFilter::debug_string() const {
     return fmt::format(
             "RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored: 
{}, "
             "build_bf_cardinality: {}, dependency: {}, synced_size: {}, 
has_local_target: {}, "
-            "has_remote_target: {},error_msg: [{}]",
+            "has_remote_target: {}, error_msg: [{}]",
             _filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
             _wrapper->_context->ignored, _wrapper->get_build_bf_cardinality(),
             _dependency ? _dependency->debug_string() : "none", _synced_size, 
_has_local_target,
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 5fef018423d..983429f15e2 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -92,10 +92,11 @@ std::string Dependency::debug_string(int indentation_level) 
{
 
 std::string CountedFinishDependency::debug_string(int indentation_level) {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer,
-                   "{}{}: id={}, block_task={}, ready={}, _always_ready={}, 
count={}",
-                   std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(),
-                   _ready, _always_ready, _counter);
+    fmt::format_to(
+            debug_string_buffer,
+            "{}{}: id={}, block_task={}, ready={}, _always_ready={}, count={}, 
_stack_set_ready={}",
+            std::string(indentation_level * 2, ' '), _name, _node_id, 
_blocked_task.size(), _ready,
+            _always_ready, _counter, _stack_set_ready);
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index f7712625d3e..71ef2d8b8fd 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -34,6 +34,7 @@
 #include "pipeline/common/set_utils.h"
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/join/process_hash_table_probe.h"
+#include "util/stack_util.h"
 #include "vec/common/sort/partition_sorter.h"
 #include "vec/common/sort/sorter.h"
 #include "vec/core/block.h"
@@ -107,7 +108,7 @@ public:
     // Which dependency current pipeline task is blocked by. `nullptr` if this 
dependency is ready.
     [[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task = 
nullptr);
     // Notify downstream pipeline tasks this dependency is ready.
-    void set_ready();
+    virtual void set_ready();
     void set_ready_to_read() {
         DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
         _shared_state->source_deps.front()->set_ready();
@@ -172,11 +173,26 @@ struct FakeSharedState final : public BasicSharedState {
     ENABLE_FACTORY_CREATOR(FakeSharedState)
 };
 
-class CountedFinishDependency final : public Dependency {
+class DependencyWithStack : public Dependency {
+public:
+    using SharedState = FakeSharedState;
+    DependencyWithStack(int id, int node_id, std::string name, bool ready = 
false)
+            : Dependency(id, node_id, name, ready) {}
+
+    void set_ready() override {
+        _stack_set_ready = get_stack_trace();
+        Dependency::set_ready();
+    }
+
+protected:
+    std::string _stack_set_ready;
+};
+
+class CountedFinishDependency final : public DependencyWithStack {
 public:
     using SharedState = FakeSharedState;
     CountedFinishDependency(int id, int node_id, std::string name)
-            : Dependency(id, node_id, name, true) {}
+            : DependencyWithStack(id, node_id, name, true) {}
 
     void add() {
         std::unique_lock<std::mutex> l(_mtx);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index d6129e5af20..8115b87c8fe 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -166,15 +166,22 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* 
state, Status exec_statu
                 SCOPED_TIMER(_runtime_filter_compute_timer);
                 _runtime_filter_slots->insert(block);
             }
+        } else if (p._shared_hashtable_controller && 
!p._shared_hash_table_context->signaled) {
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "build_sink::close meet error state, 
shared_hash_table_signaled: {}, "
+                            "complete_build_stage: {}",
+                            p._shared_hash_table_context->signaled,
+                            
p._shared_hash_table_context->complete_build_stage);
         }
 
         SCOPED_TIMER(_publish_runtime_filter_timer);
         RETURN_IF_ERROR(_runtime_filter_slots->publish(state, 
!_should_build_hash_table));
     } catch (Exception& e) {
         return Status::InternalError(
-                "rf process meet error: {}, wake_up_by_downstream: {}, 
should_build_hash_table: {}",
-                e.to_string(), state->get_task()->wake_up_by_downstream(),
-                _should_build_hash_table);
+                "rf process meet error: {}, wake_up_by_downstream: {}, 
should_build_hash_table: "
+                "{}, _finish_dependency: {}",
+                e.to_string(), state->get_task()->wake_up_by_downstream(), 
_should_build_hash_table,
+                _finish_dependency->debug_string());
     }
     return Base::close(state, exec_status);
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to