This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 46d27a21f43 [fix](pipeline) Crashing caused by repeated spill
operations (#56755)
46d27a21f43 is described below
commit 46d27a21f43e85ed4dc642afb4bdd4c722fb0588
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Oct 16 14:41:05 2025 +0800
[fix](pipeline) Crashing caused by repeated spill operations (#56755)
### What problem does this PR solve?
```text
#0 __GI___pthread_sigmask (how=2, newmask=<optimized out>, oldmask=0x0) at
./nptl/pthread_sigmask.c:43
#1 0x00007f91fd6c471e in PosixSignals::chained_handler(int, siginfo*,
void*) [clone .part.0] () from
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
#2 0x00007f91fd6c5206 in JVM_handle_linux_signal () from
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
#3 <signal handler called>
#4 __gnu_cxx::__exchange_and_add (__mem=0xe, __val=-1) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/ext/atomicity.h:68
#5 __gnu_cxx::__exchange_and_add_dispatch (__mem=0xe, __val=-1) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/ext/atomicity.h:103
#6 std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_weak_release
(this=0x2) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:211
#7 std::__weak_count<(__gnu_cxx::_Lock_policy)2>::~__weak_count
(this=<optimized out>) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1168
#8 std::__weak_ptr<doris::QueryContext,
(__gnu_cxx::_Lock_policy)2>::~__weak_ptr (this=<optimized out>) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:2003
#9 doris::QueryTaskController::revoke_memory()::$_1::~$_1()
(this=<optimized out>) at
/root/doris/be/src/runtime/workload_management/query_task_controller.cpp:168
#10
std::_Function_base::_Base_manager<doris::QueryTaskController::revoke_memory()::$_1>::_M_destroy(std::_Any_data&,
std::integral_constant<bool, false>) (__victim=...)
at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:177
#11
std::_Function_base::_Base_manager<doris::QueryTaskController::revoke_memory()::$_1>::_M_manager(std::_Any_data&,
std::_Any_data const&, std::_Manager_operation) (__dest=...,
__op=std::__destroy_functor, __source=...)
at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:205
#12 std::_Function_handler<void (doris::pipeline::SpillContext*),
doris::QueryTaskController::revoke_memory()::$_1>::_M_manager(std::_Any_data&,
std::_Any_data const&, std::_Manager_operation) (__dest=..., __source=...,
__op=std::__destroy_functor)
at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:284
#13 0x000055f37fba60c5 in std::_Function_base::~_Function_base
(this=0x7f8dbb0c9fe0) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:246
#14 doris::pipeline::SpillContext::~SpillContext (this=0x7f91536f0150) at
/root/doris/be/src/pipeline/exec/spill_utils.h:57
#15 0x000055f384259e4b in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release
(this=0x7f91536f0140) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:345
#16 std::__shared_count<(__gnu_cxx::_Lock_policy)2>::operator=
(this=0x7f8d61302518, __r=...) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1088
#17 std::__shared_ptr<doris::pipeline::SpillContext,
(__gnu_cxx::_Lock_policy)2>::operator= (this=0x7f8d61302510) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr_base.h:1530
#18 std::shared_ptr<doris::pipeline::SpillContext>::operator=
(this=0x7f8d61302510) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/shared_ptr.h:413
#19 doris::pipeline::PipelineTask::revoke_memory (this=0x7f8d61302490,
spill_context=...) at /root/doris/be/src/pipeline/pipeline_task.cpp:812
#20 0x000055f37fba3c2d in doris::QueryTaskController::revoke_memory
(this=<optimized out>) at
/root/doris/be/src/runtime/workload_management/query_task_controller.cpp:185
#21 0x000055f37fb99d08 in doris::WorkloadGroupMgr::handle_single_query_
(this=<optimized out>, requestor=...,
size_to_reserve=size_to_reserve@entry=1024000,
time_in_queue=time_in_queue@entry=27, paused_reason=...)
at
/root/doris/be/src/runtime/workload_group/workload_group_manager.cpp:820
#22 0x000055f37fb981d0 in doris::WorkloadGroupMgr::handle_paused_queries
(this=0x7f9141d8a800) at
/root/doris/be/src/runtime/workload_group/workload_group_manager.cpp:381
#23 0x000055f37eb97297 in doris::Daemon::memory_maintenance_thread
(this=0x7ffe63cad730) at /root/doris/be/src/common/daemon.cpp:354
#24 0x000055f37fd812fc in std::function<void ()>::operator()() const
(this=0x7f8dbb0c9fe0) at
/usr/local/ldb-toolchain-v0.26/bin/../lib/gcc/x86_64-pc-linux-gnu/15/include/g++-v15/bits/std_function.h:593
#25 doris::Thread::supervise_thread (arg=0x7f914ba25e10) at
/root/doris/be/src/util/thread.cpp:460
#26 0x00007f91fc75fac3 in start_thread (arg=<optimized out>) at
./nptl/pthread_create.c:442
#27 0x00007f91fc7f1850 in __closefrom_fallback (from=1674236160,
dirfd_fallback=<optimized out>) at
../sysdeps/unix/sysv/linux/closefrom_fallback.c:45
#28 0x0000000000000000 in ?? ()
```
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/exec/spill_utils.h | 4 +-
be/src/pipeline/pipeline_task.cpp | 73 +++++++++++++++++++++++-------------
be/src/pipeline/pipeline_task.h | 40 +++++++++++---------
be/src/pipeline/revokable_task.h | 76 ++++++++++++++++++++++++++++++++++++++
be/src/pipeline/task_scheduler.cpp | 11 +++++-
5 files changed, 157 insertions(+), 47 deletions(-)
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index d6f2a811f3a..beb2ecaa984 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -201,8 +201,8 @@ protected:
}
void _on_task_started() override {
- LOG(INFO) << "SpillRecoverRunnable, Query: " <<
print_id(_state->query_id())
- << " spill task started, pipeline task id: " <<
_state->task_id();
+ VLOG_DEBUG << "SpillRecoverRunnable, Query: " <<
print_id(_state->query_id())
+ << " spill task started, pipeline task id: " <<
_state->task_id();
COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
COUNTER_UPDATE(_reading_task_count, 1);
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 9f1512982b9..cee0c0562b2 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -23,6 +23,7 @@
#include <glog/logging.h>
#include <algorithm>
+#include <memory>
#include <ostream>
#include <vector>
@@ -35,6 +36,7 @@
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
+#include "revokable_task.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/query_context.h"
@@ -99,14 +101,15 @@ PipelineTask::~PipelineTask() {
// But pipeline task hold some objects, like operators, shared state, etc. So
that should release
// memory manually.
#ifndef BE_TEST
- SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+ if (_query_mem_tracker) {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_mem_tracker);
+ }
#endif
_shared_state_map.clear();
_sink_shared_state.reset();
_op_shared_states.clear();
_sink.reset();
_operators.clear();
- _spill_context.reset();
_block.reset();
_pipeline.reset();
}
@@ -306,17 +309,12 @@ bool PipelineTask::is_blockable() const {
}
}
- return _need_to_revoke_memory ||
- std::ranges::any_of(_operators,
+ return std::ranges::any_of(_operators,
[&](OperatorPtr op) -> bool { return
op->is_blockable(_state); }) ||
_sink->is_blockable(_state);
}
bool PipelineTask::_is_blocked() {
- if (_need_to_revoke_memory) {
- return false;
- }
-
// `_dry_run = true` means we do not need data from source operator.
if (!_dry_run) {
for (int i = cast_set<int>(_read_dependencies.size() - 1); i >= 0;
i--) {
@@ -378,11 +376,15 @@ void PipelineTask::terminate() {
* @return
*/
Status PipelineTask::execute(bool* done) {
- if (!_need_to_revoke_memory && (_exec_state != State::RUNNABLE ||
_blocked_dep != nullptr))
- [[unlikely]] {
+ if (_exec_state != State::RUNNABLE || _blocked_dep != nullptr)
[[unlikely]] {
+#ifdef BE_TEST
return Status::InternalError("Pipeline task is not runnable! Task
info: {}",
debug_string());
+#else
+ return Status::FatalError("Pipeline task is not runnable! Task info:
{}", debug_string());
+#endif
}
+
auto fragment_context = _fragment_context.lock();
if (!fragment_context) {
return Status::InternalError("Fragment already finished! Query: {}",
print_id(_query_id));
@@ -477,11 +479,6 @@ Status PipelineTask::execute(bool* done) {
break;
}
- if (_need_to_revoke_memory) {
- _need_to_revoke_memory = false;
- return _sink->revoke_memory(_state, _spill_context);
- }
-
if (time_spent > _exec_time_slice) {
COUNTER_UPDATE(_yield_counts, 1);
break;
@@ -610,6 +607,33 @@ Status PipelineTask::execute(bool* done) {
return Status::OK();
}
+Status PipelineTask::do_revoke_memory(const std::shared_ptr<SpillContext>&
spill_context) {
+ auto fragment_context = _fragment_context.lock();
+ if (!fragment_context) {
+ return Status::InternalError("Fragment already finished! Query: {}",
print_id(_query_id));
+ }
+
+ SCOPED_ATTACH_TASK(_state);
+ ThreadCpuStopWatch cpu_time_stop_watch;
+ cpu_time_stop_watch.start();
+ Defer running_defer {[&]() {
+ int64_t delta_cpu_time = cpu_time_stop_watch.elapsed_time();
+ _task_cpu_timer->update(delta_cpu_time);
+
fragment_context->get_query_ctx()->resource_ctx()->cpu_context()->update_cpu_cost_ms(
+ delta_cpu_time);
+
+ // If task is woke up early, we should terminate all operators, and
this task could be closed immediately.
+ if (_wake_up_early) {
+ terminate();
+ THROW_IF_ERROR(_root->terminate(_state));
+ THROW_IF_ERROR(_sink->terminate(_state));
+ _eos = true;
+ }
+ }};
+
+ return _sink->revoke_memory(_state, spill_context);
+}
+
bool PipelineTask::_try_to_reserve_memory(const size_t reserve_size,
OperatorBase* op) {
auto st =
thread_context()->thread_mem_tracker_mgr->try_reserve(reserve_size);
COUNTER_UPDATE(_memory_reserve_times, 1);
@@ -794,7 +818,7 @@ std::string PipelineTask::debug_string() {
}
size_t PipelineTask::get_revocable_size() const {
- if (is_finalized() || _running || (_eos && !_spilling)) {
+ if (!_opened || is_finalized() || _running || (_eos && !_spilling)) {
return 0;
}
@@ -802,22 +826,19 @@ size_t PipelineTask::get_revocable_size() const {
}
Status PipelineTask::revoke_memory(const std::shared_ptr<SpillContext>&
spill_context) {
+ DCHECK(spill_context);
if (is_finalized()) {
- if (spill_context) {
- spill_context->on_task_finished();
- VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ",
task: " << ((void*)this)
- << " finalized";
- }
+ spill_context->on_task_finished();
+ VLOG_DEBUG << "Query: " << print_id(_state->query_id()) << ", task: "
<< ((void*)this)
+ << " finalized";
return Status::OK();
}
const auto revocable_size = _sink->revocable_mem_size(_state);
if (revocable_size >= vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- _need_to_revoke_memory = true;
- _spill_context = spill_context;
- RETURN_IF_ERROR(
-
_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(shared_from_this()));
- } else if (spill_context) {
+ auto revokable_task =
std::make_shared<RevokableTask>(shared_from_this(), spill_context);
+
RETURN_IF_ERROR(_state->get_query_ctx()->get_pipe_exec_scheduler()->submit(revokable_task));
+ } else {
spill_context->on_task_finished();
LOG(INFO) << "Query: " << print_id(_state->query_id()) << ", task: "
<< ((void*)this)
<< " has not enough data to revoke: " << revocable_size;
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 41019e4c598..e2d51858be4 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -55,24 +55,24 @@ public:
shared_state_map,
int task_idx);
- ~PipelineTask();
+ virtual ~PipelineTask();
Status prepare(const std::vector<TScanRangeParams>& scan_range, const int
sender_id,
const TDataSink& tsink);
- Status execute(bool* done);
+ virtual Status execute(bool* done);
// if the pipeline create a bunch of pipeline task
// must be call after all pipeline task is finish to release resource
- Status close(Status exec_status, bool close_sink = true);
+ virtual Status close(Status exec_status, bool close_sink = true);
- std::weak_ptr<PipelineFragmentContext>& fragment_context() { return
_fragment_context; }
+ virtual std::weak_ptr<PipelineFragmentContext>& fragment_context() {
return _fragment_context; }
int get_thread_id(int num_threads) const {
return _thread_id == -1 ? _thread_id : _thread_id % num_threads;
}
- PipelineTask& set_thread_id(int thread_id) {
+ virtual PipelineTask& set_thread_id(int thread_id) {
_thread_id = thread_id;
if (thread_id != _thread_id) {
COUNTER_UPDATE(_core_change_times, 1);
@@ -80,7 +80,7 @@ public:
return *this;
}
- Status finalize();
+ virtual Status finalize();
std::string debug_string();
@@ -94,7 +94,7 @@ public:
* Pipeline task is blockable means it will be blocked in the next run. So
we should put it into
* the blocking task scheduler.
*/
- bool is_blockable() const;
+ virtual bool is_blockable() const;
/**
* `shared_state` is shared by different pipeline tasks. This function
aims to establish
@@ -125,7 +125,7 @@ public:
DataSinkOperatorPtr sink() const { return _sink; }
int task_id() const { return _index; };
- bool is_finalized() const { return _exec_state == State::FINALIZED; }
+ virtual bool is_finalized() const { return _exec_state ==
State::FINALIZED; }
void set_wake_up_early(PipelineId wake_by = -1) {
_wake_up_early = true;
@@ -153,19 +153,24 @@ public:
void pop_out_runnable_queue() { _wait_worker_watcher.stop(); }
bool is_running() { return _running.load(); }
- PipelineTask& set_running(bool running) {
- _running.exchange(running);
- return *this;
+ virtual bool set_running(bool running) {
+ bool old_value = !running;
+ _running.compare_exchange_weak(old_value, running);
+ return old_value;
}
- RuntimeState* runtime_state() const { return _state; }
+ virtual RuntimeState* runtime_state() const { return _state; }
+
+ virtual std::string task_name() const {
+ return fmt::format("task{}({})", _index, _pipeline->_name);
+ }
- std::string task_name() const { return fmt::format("task{}({})", _index,
_pipeline->_name); }
+ [[nodiscard]] Status do_revoke_memory(const std::shared_ptr<SpillContext>&
spill_context);
// TODO: Maybe we do not need this safe code anymore
void stop_if_finished();
- PipelineId pipeline_id() const { return _pipeline->id(); }
+ virtual PipelineId pipeline_id() const { return _pipeline->id(); }
[[nodiscard]] size_t get_revocable_size() const;
[[nodiscard]] Status revoke_memory(const std::shared_ptr<SpillContext>&
spill_context);
@@ -175,6 +180,10 @@ public:
return _state_transition(PipelineTask::State::BLOCKED);
}
+protected:
+ // Only used for RevokableTask
+ PipelineTask() : _index(0) {}
+
private:
// Whether this task is blocked before execution (FE 2-phase commit
trigger, runtime filters)
bool _wait_to_start();
@@ -214,9 +223,6 @@ private:
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
- bool _need_to_revoke_memory = false;
- std::shared_ptr<SpillContext> _spill_context;
-
RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
RuntimeProfile::Counter* _task_cpu_timer = nullptr;
diff --git a/be/src/pipeline/revokable_task.h b/be/src/pipeline/revokable_task.h
new file mode 100644
index 00000000000..d4d253c2703
--- /dev/null
+++ b/be/src/pipeline/revokable_task.h
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/exec/spill_utils.h"
+#include "pipeline/pipeline.h"
+#include "pipeline/pipeline_task.h"
+#include "pipeline_task.h"
+
+namespace doris {
+class RuntimeState;
+
+namespace pipeline {
+class PipelineFragmentContext;
+
+class RevokableTask : public PipelineTask {
+public:
+ RevokableTask(PipelineTaskSPtr task, std::shared_ptr<SpillContext>
spill_context)
+ : _task(std::move(task)), _spill_context(std::move(spill_context))
{}
+
+ ~RevokableTask() override = default;
+
+ RuntimeState* runtime_state() const override { return
_task->runtime_state(); }
+
+ Status close(Status exec_status, bool close_sink) override {
+ return _task->close(exec_status, close_sink);
+ }
+
+ Status finalize() override { return _task->finalize(); }
+
+ bool set_running(bool running) override { return
_task->set_running(running); }
+
+ bool is_finalized() const override { return _task->is_finalized(); }
+
+ std::weak_ptr<PipelineFragmentContext>& fragment_context() override {
+ return _task->fragment_context();
+ }
+
+ PipelineTask& set_thread_id(int thread_id) override { return
_task->set_thread_id(thread_id); }
+
+ PipelineId pipeline_id() const override { return _task->pipeline_id(); }
+
+ std::string task_name() const override { return _task->task_name(); }
+
+ Status execute(bool* done) override { return
_task->do_revoke_memory(_spill_context); }
+
+ bool is_blockable() const override { return true; }
+
+private:
+ PipelineTaskSPtr _task;
+ std::shared_ptr<SpillContext> _spill_context;
+};
+
+} // namespace pipeline
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 794a08155d1..228335d1aa0 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -104,19 +104,26 @@ void TaskScheduler::_do_work(int index) {
// The task is already running, maybe block in now dependency wake up
by other thread
// but the block thread still hold the task, so put it back to the
queue, until the hold
// thread set task->set_running(false)
- if (task->is_running()) {
+ // set_running return the old value
+ if (task->set_running(true)) {
static_cast<void>(_task_queue.push_back(task, index));
continue;
}
+
if (task->is_finalized()) {
+ task->set_running(false);
continue;
}
+
auto fragment_context = task->fragment_context().lock();
if (!fragment_context) {
// Fragment already finished
+ task->set_running(false);
continue;
}
- task->set_running(true).set_thread_id(index);
+
+ task->set_thread_id(index);
+
bool done = false;
auto status = Status::OK();
int64_t exec_ns = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]