This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3e819ab64b0 [minor](task) Complete debug string if task is finalized
(#49950)
3e819ab64b0 is described below
commit 3e819ab64b00a52ce622aa315f6fc81fe92f692e
Author: Gabriel <[email protected]>
AuthorDate: Fri Apr 11 17:29:18 2025 +0800
[minor](task) Complete debug string if task is finalized (#49950)
1. Add spill dependencies to run BE UT.
2. Add pipeline name in debug string if task is finalized.
---
be/src/pipeline/exec/operator.h | 6 +++++
be/src/pipeline/pipeline.h | 1 +
be/src/pipeline/pipeline_task.cpp | 43 +++++++++++++++------------------
be/src/pipeline/pipeline_task.h | 1 +
be/test/pipeline/pipeline_task_test.cpp | 4 +--
5 files changed, 29 insertions(+), 26 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0625bcc0ad0..75a767aaa83 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -1100,12 +1100,15 @@ public:
"DummyOperatorDependency", true);
_filter_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"DummyOperatorDependency", true);
+ _spill_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"DummyOperatorDependency", true);
}
Dependency* finishdependency() override { return _finish_dependency.get();
}
~DummyOperatorLocalState() = default;
std::vector<Dependency*> dependencies() const override { return
{_tmp_dependency.get()}; }
std::vector<Dependency*> filter_dependencies() override { return
{_filter_dependency.get()}; }
+ Dependency* spill_dependency() const override { return
_spill_dependency.get(); }
private:
std::shared_ptr<Dependency> _tmp_dependency;
@@ -1145,10 +1148,13 @@ public:
"DummyOperatorDependency",
true);
_finish_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"DummyOperatorDependency", true);
+ _spill_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
+
"DummyOperatorDependency", true);
}
std::vector<Dependency*> dependencies() const override { return
{_tmp_dependency.get()}; }
Dependency* finishdependency() override { return _finish_dependency.get();
}
+ Dependency* spill_dependency() const override { return
_spill_dependency.get(); }
bool is_finished() const override { return _is_finished; }
private:
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 7bde9323e94..7fff24cf8d9 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -138,6 +138,7 @@ public:
}
int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+ std::string& name() { return _name; }
private:
void _init_profile();
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 94c5c6f7c75..852174137de 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -79,8 +79,8 @@ PipelineTask::PipelineTask(PipelinePtr& pipeline, uint32_t
task_id, RuntimeState
_shared_state_map(std::move(shared_state_map)),
_task_idx(task_idx),
_execution_dep(state->get_query_ctx()->get_execution_dependency()),
- _memory_sufficient_dependency(
- state->get_query_ctx()->get_memory_sufficient_dependency()) {
+
_memory_sufficient_dependency(state->get_query_ctx()->get_memory_sufficient_dependency()),
+ _pipeline_name(_pipeline->name()) {
_pipeline_task_watcher.start();
if (!_shared_state_map.contains(_sink->dests_id().front())) {
@@ -291,24 +291,19 @@ void PipelineTask::terminate() {
auto fragment = _fragment_context.lock();
if (!is_finalized() && fragment) {
DCHECK(_wake_up_early || fragment->is_canceled());
- for (auto* dep : _spill_dependencies) {
- dep->set_always_ready();
- }
-
- for (auto* dep : _filter_dependencies) {
- dep->set_always_ready();
- }
- for (auto& deps : _read_dependencies) {
- for (auto* dep : deps) {
- dep->set_always_ready();
- }
- }
- for (auto* dep : _write_dependencies) {
- dep->set_always_ready();
- }
- for (auto* dep : _finish_dependencies) {
- dep->set_always_ready();
- }
+ std::for_each(_spill_dependencies.begin(), _spill_dependencies.end(),
+ [&](Dependency* dep) { dep->set_always_ready(); });
+ std::for_each(_filter_dependencies.begin(), _filter_dependencies.end(),
+ [&](Dependency* dep) { dep->set_always_ready(); });
+ std::for_each(_write_dependencies.begin(), _write_dependencies.end(),
+ [&](Dependency* dep) { dep->set_always_ready(); });
+ std::for_each(_finish_dependencies.begin(), _finish_dependencies.end(),
+ [&](Dependency* dep) { dep->set_always_ready(); });
+ std::for_each(_read_dependencies.begin(), _read_dependencies.end(),
+ [&](std::vector<Dependency*>& deps) {
+ std::for_each(deps.begin(), deps.end(),
+ [&](Dependency* dep) {
dep->set_always_ready(); });
+ });
_execution_dep->set_ready();
_memory_sufficient_dependency->set_ready();
}
@@ -696,16 +691,16 @@ std::string PipelineTask::debug_string() {
print_id(_state->fragment_instance_id()));
fmt::format_to(debug_string_buffer,
- "PipelineTask[this = {}, id = {}, open = {}, eos = {},
state = {}, dry run = "
+ "PipelineTask[id = {}, open = {}, eos = {}, state = {}, dry
run = "
"{}, _wake_up_early = {}, time elapsed since last state
changing = {}s, spilling"
" = {}, is running = {}]",
- (void*)this, _index, _opened, _eos,
_to_string(_exec_state), _dry_run,
- _wake_up_early.load(), _state_change_watcher.elapsed_time()
/ NANOS_PER_SEC,
- _spilling, is_running());
+ _index, _opened, _eos, _to_string(_exec_state), _dry_run,
_wake_up_early.load(),
+ _state_change_watcher.elapsed_time() / NANOS_PER_SEC,
_spilling, is_running());
std::unique_lock<std::mutex> lc(_dependency_lock);
auto* cur_blocked_dep = _blocked_dep;
auto fragment = _fragment_context.lock();
if (is_finalized() || !fragment) {
+ fmt::format_to(debug_string_buffer, " pipeline name = {}",
_pipeline_name);
return fmt::to_string(debug_string_buffer);
}
auto elapsed = fragment->elapsed_time() / NANOS_PER_SEC;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 4dbaa58feec..36a85f7321e 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -336,6 +336,7 @@ private:
std::atomic<State> _exec_state = State::INITED;
MonotonicStopWatch _state_change_watcher;
std::atomic<bool> _spilling = false;
+ const std::string _pipeline_name;
};
using PipelineTaskSPtr = std::shared_ptr<PipelineTask>;
diff --git a/be/test/pipeline/pipeline_task_test.cpp
b/be/test/pipeline/pipeline_task_test.cpp
index f5a68d2da1b..3e16f21568a 100644
--- a/be/test/pipeline/pipeline_task_test.cpp
+++ b/be/test/pipeline/pipeline_task_test.cpp
@@ -263,7 +263,7 @@ TEST_F(PipelineTaskTest, TEST_OPEN) {
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
- EXPECT_TRUE(task->_spill_dependencies.empty());
+ EXPECT_FALSE(task->_spill_dependencies.empty());
EXPECT_TRUE(task->_opened);
}
}
@@ -360,7 +360,7 @@ TEST_F(PipelineTaskTest, TEST_EXECUTE) {
EXPECT_FALSE(task->_read_dependencies.empty());
EXPECT_FALSE(task->_write_dependencies.empty());
EXPECT_FALSE(task->_finish_dependencies.empty());
- EXPECT_TRUE(task->_spill_dependencies.empty());
+ EXPECT_FALSE(task->_spill_dependencies.empty());
EXPECT_TRUE(task->_opened);
EXPECT_FALSE(read_dep->ready());
EXPECT_TRUE(write_dep->ready());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]