github-actions[bot] commented on code in PR #62190:
URL: https://github.com/apache/doris/pull/62190#discussion_r3048903520
##########
be/src/exec/pipeline/dependency.cpp:
##########
@@ -62,25 +62,44 @@ void
Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
_blocked_task.push_back(task);
}
-void Dependency::set_ready() {
+void Dependency::set_ready() noexcept {
if (_ready) {
return;
}
- std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
- {
- std::unique_lock<std::mutex> lc(_task_lock);
- if (_ready) {
- return;
- }
- _watcher.stop();
- _ready = true;
- local_block_task.swap(_blocked_task);
- }
- for (auto task : local_block_task) {
- if (auto t = task.lock()) {
+ try {
+ std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
+ {
std::unique_lock<std::mutex> lc(_task_lock);
- THROW_IF_ERROR(t->wake_up(this, lc));
+ if (_ready) {
+ return;
+ }
+ _watcher.stop();
+ _ready = true;
+ local_block_task.swap(_blocked_task);
+ }
+ for (const auto& task : local_block_task) {
+ if (auto t = task.lock()) {
+ std::unique_lock<std::mutex> lc(_task_lock);
+ auto st = t->wake_up(this, lc);
+ if (!st.ok()) {
+ LOG(WARNING) << "Dependency::set_ready(): failed to
wake_up task, cancelling "
+ "query. dep="
+ << _name << ", task=" << t->debug_string() <<
", status=" << st;
+ if (auto frag = t->fragment_context().lock()) {
+ frag->cancel(Status::InternalError(
+ "wake_up failed in Dependency::set_ready: {}",
st.to_string()));
+ }
+ }
+ }
}
+ } catch (const std::exception& e) {
+ // Non-Doris exceptions (e.g. std::bad_alloc from scheduler submit
path).
+ LOG(WARNING) << "Dependency::set_ready(): unexpected exception during
wake_up, "
+ "cancelling query. dep="
Review Comment:
Once `_ready` is set and `_blocked_task` is swapped out, any exception that
escapes one iteration aborts the rest of `local_block_task`. The outer catch
then only logs and returns. A concrete case is `std::bad_alloc` thrown from
`t->wake_up()` or from the logging/cancel code in this loop: the current task
is never resubmitted, the dependency is already marked ready, and no later
`set_ready()` call will revisit the dropped waiters. That leaves the fragment
hanging instead of cancelling it. The exception handling needs to be per task,
with fragment cancellation or equivalent unblock logic before continuing to the
next waiter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]