github-actions[bot] commented on code in PR #62190:
URL: https://github.com/apache/doris/pull/62190#discussion_r3048925372
##########
be/src/exec/pipeline/dependency.cpp:
##########
@@ -62,25 +62,42 @@ void
Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
_blocked_task.push_back(task);
}
-void Dependency::set_ready() {
+void Dependency::set_ready() noexcept {
if (_ready) {
return;
}
- std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
- {
- std::unique_lock<std::mutex> lc(_task_lock);
- if (_ready) {
- return;
- }
- _watcher.stop();
- _ready = true;
- local_block_task.swap(_blocked_task);
- }
- for (auto task : local_block_task) {
- if (auto t = task.lock()) {
+ try {
+ std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
+ {
std::unique_lock<std::mutex> lc(_task_lock);
- THROW_IF_ERROR(t->wake_up(this, lc));
+ if (_ready) {
+ return;
+ }
+ _watcher.stop();
+ _ready = true;
+ local_block_task.swap(_blocked_task);
+ }
+ for (const auto& task : local_block_task) {
+ if (auto t = task.lock()) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto st = t->wake_up(this, lc);
+ if (!st.ok()) {
Review Comment:
`PipelineTask::wake_up()` is not an atomic "enqueue me" operation. It first
clears `_blocked_dep` and transitions the task to `RUNNABLE`, and only then
calls `submit()` (`pipeline_task.cpp:1047-1055`). If `submit()` returns non-OK
here, this new branch only cancels the fragment and returns. The task has
already been removed from `_blocked_task`, is no longer blocked on this
dependency, and was never inserted into the scheduler queue, so no worker
thread will ever call `close_task()` / `decrement_running_task()` for it.
`PipelineFragmentContext::cancel()` only calls `unblock_all_dependencies()`,
which does not repair that stranded-task state. This turns a scheduler failure
into a potential permanent hang/leaked running-task count.
##########
be/src/exec/pipeline/dependency.cpp:
##########
@@ -62,25 +62,42 @@ void
Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
_blocked_task.push_back(task);
}
-void Dependency::set_ready() {
+void Dependency::set_ready() noexcept {
if (_ready) {
return;
}
- std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
- {
- std::unique_lock<std::mutex> lc(_task_lock);
- if (_ready) {
- return;
- }
- _watcher.stop();
- _ready = true;
- local_block_task.swap(_blocked_task);
- }
- for (auto task : local_block_task) {
- if (auto t = task.lock()) {
+ try {
+ std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
+ {
std::unique_lock<std::mutex> lc(_task_lock);
- THROW_IF_ERROR(t->wake_up(this, lc));
+ if (_ready) {
+ return;
+ }
+ _watcher.stop();
+ _ready = true;
+ local_block_task.swap(_blocked_task);
+ }
+ for (const auto& task : local_block_task) {
+ if (auto t = task.lock()) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto st = t->wake_up(this, lc);
+ if (!st.ok()) {
+ LOG(WARNING) << "Dependency::set_ready(): failed to
wake_up task, cancelling "
+ "query. dep="
+ << _name << ", task=" << t->debug_string() <<
", status=" << st;
+ if (auto frag = t->fragment_context().lock()) {
+ frag->cancel(Status::InternalError(
+ "wake_up failed in Dependency::set_ready: {}",
st.to_string()));
+ }
+ }
+ }
}
+ } catch (const std::exception& e) {
+ // Recovery itself threw (e.g. OOM during logging/cancel). Best
effort: nothing more we can do for this task.
Review Comment:
This still aborts the BE process on the exact `std::exception` path the PR
description says should be handled gracefully. In particular, `std::bad_alloc`
during logging or `frag->cancel()` will now hit `CHECK(false)` and crash the
process instead of degrading to a query-local failure. If the intent is truly
`noexcept` + best-effort recovery, this branch needs to avoid process
termination; otherwise the PR description should not claim graceful handling
for standard exceptions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]