This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5b5cbf64afd [Chore](join) add debug info for build_sink::close (#44974)
5b5cbf64afd is described below
commit 5b5cbf64afdb576dcb02ec30a0be477b604c27aa
Author: Pxl <[email protected]>
AuthorDate: Wed Dec 4 18:24:23 2024 +0800
[Chore](join) add debug info for build_sink::close (#44974)
add debug info for build_sink::close
---
be/src/exprs/runtime_filter.cpp | 2 +-
be/src/pipeline/dependency.cpp | 9 +++++----
be/src/pipeline/dependency.h | 22 +++++++++++++++++++---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 13 ++++++++++---
4 files changed, 35 insertions(+), 11 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index 606df9fe980..34dfb8ff8ba 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -1532,7 +1532,7 @@ std::string IRuntimeFilter::debug_string() const {
return fmt::format(
"RuntimeFilter: (id = {}, type = {}, is_broadcast: {}, ignored:
{}, "
"build_bf_cardinality: {}, dependency: {}, synced_size: {},
has_local_target: {}, "
- "has_remote_target: {},error_msg: [{}]",
+ "has_remote_target: {}, error_msg: [{}]",
_filter_id, to_string(_runtime_filter_type), _is_broadcast_join,
_wrapper->_context->ignored, _wrapper->get_build_bf_cardinality(),
_dependency ? _dependency->debug_string() : "none", _synced_size,
_has_local_target,
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 5fef018423d..983429f15e2 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -92,10 +92,11 @@ std::string Dependency::debug_string(int indentation_level)
{
std::string CountedFinishDependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer,
- "{}{}: id={}, block_task={}, ready={}, _always_ready={},
count={}",
- std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
- _ready, _always_ready, _counter);
+ fmt::format_to(
+ debug_string_buffer,
+ "{}{}: id={}, block_task={}, ready={}, _always_ready={}, count={},
_stack_set_ready={}",
+ std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(), _ready,
+ _always_ready, _counter, _stack_set_ready);
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index f7712625d3e..71ef2d8b8fd 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -34,6 +34,7 @@
#include "pipeline/common/set_utils.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/join/process_hash_table_probe.h"
+#include "util/stack_util.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
#include "vec/core/block.h"
@@ -107,7 +108,7 @@ public:
// Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
[[nodiscard]] virtual Dependency* is_blocked_by(PipelineTask* task =
nullptr);
// Notify downstream pipeline tasks this dependency is ready.
- void set_ready();
+ virtual void set_ready();
void set_ready_to_read() {
DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
_shared_state->source_deps.front()->set_ready();
@@ -172,11 +173,26 @@ struct FakeSharedState final : public BasicSharedState {
ENABLE_FACTORY_CREATOR(FakeSharedState)
};
-class CountedFinishDependency final : public Dependency {
+class DependencyWithStack : public Dependency {
+public:
+ using SharedState = FakeSharedState;
+ DependencyWithStack(int id, int node_id, std::string name, bool ready =
false)
+ : Dependency(id, node_id, name, ready) {}
+
+ void set_ready() override {
+ _stack_set_ready = get_stack_trace();
+ Dependency::set_ready();
+ }
+
+protected:
+ std::string _stack_set_ready;
+};
+
+class CountedFinishDependency final : public DependencyWithStack {
public:
using SharedState = FakeSharedState;
CountedFinishDependency(int id, int node_id, std::string name)
- : Dependency(id, node_id, name, true) {}
+ : DependencyWithStack(id, node_id, name, true) {}
void add() {
std::unique_lock<std::mutex> l(_mtx);
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index d6129e5af20..8115b87c8fe 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -166,15 +166,22 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState*
state, Status exec_statu
SCOPED_TIMER(_runtime_filter_compute_timer);
_runtime_filter_slots->insert(block);
}
+ } else if (p._shared_hashtable_controller &&
!p._shared_hash_table_context->signaled) {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "build_sink::close meet error state,
shared_hash_table_signaled: {}, "
+ "complete_build_stage: {}",
+ p._shared_hash_table_context->signaled,
+
p._shared_hash_table_context->complete_build_stage);
}
SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(state,
!_should_build_hash_table));
} catch (Exception& e) {
return Status::InternalError(
- "rf process meet error: {}, wake_up_by_downstream: {},
should_build_hash_table: {}",
- e.to_string(), state->get_task()->wake_up_by_downstream(),
- _should_build_hash_table);
+ "rf process meet error: {}, wake_up_by_downstream: {},
should_build_hash_table: "
+ "{}, _finish_dependency: {}",
+ e.to_string(), state->get_task()->wake_up_by_downstream(),
_should_build_hash_table,
+ _finish_dependency->debug_string());
}
return Base::close(state, exec_status);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]