github-actions[bot] commented on code in PR #62190:
URL: https://github.com/apache/doris/pull/62190#discussion_r3048903520


##########
be/src/exec/pipeline/dependency.cpp:
##########
@@ -62,25 +62,44 @@ void 
Dependency::_add_block_task(std::shared_ptr<PipelineTask> task) {
     _blocked_task.push_back(task);
 }
 
-void Dependency::set_ready() {
+void Dependency::set_ready() noexcept {
     if (_ready) {
         return;
     }
-    std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
-    {
-        std::unique_lock<std::mutex> lc(_task_lock);
-        if (_ready) {
-            return;
-        }
-        _watcher.stop();
-        _ready = true;
-        local_block_task.swap(_blocked_task);
-    }
-    for (auto task : local_block_task) {
-        if (auto t = task.lock()) {
+    try {
+        std::vector<std::weak_ptr<PipelineTask>> local_block_task {};
+        {
             std::unique_lock<std::mutex> lc(_task_lock);
-            THROW_IF_ERROR(t->wake_up(this, lc));
+            if (_ready) {
+                return;
+            }
+            _watcher.stop();
+            _ready = true;
+            local_block_task.swap(_blocked_task);
+        }
+        for (const auto& task : local_block_task) {
+            if (auto t = task.lock()) {
+                std::unique_lock<std::mutex> lc(_task_lock);
+                auto st = t->wake_up(this, lc);
+                if (!st.ok()) {
+                    LOG(WARNING) << "Dependency::set_ready(): failed to 
wake_up task, cancelling "
+                                    "query. dep="
+                                 << _name << ", task=" << t->debug_string() << 
", status=" << st;
+                    if (auto frag = t->fragment_context().lock()) {
+                        frag->cancel(Status::InternalError(
+                                "wake_up failed in Dependency::set_ready: {}", 
st.to_string()));
+                    }
+                }
+            }
         }
+    } catch (const std::exception& e) {
+        // Non-Doris exceptions (e.g. std::bad_alloc from scheduler submit 
path).
+        LOG(WARNING) << "Dependency::set_ready(): unexpected exception during 
wake_up, "
+                        "cancelling query. dep="

Review Comment:
   Once `_ready` is set and `_blocked_task` is swapped out, any exception that 
escapes one iteration aborts the rest of `local_block_task`. The outer catch 
then only logs and returns. A concrete case is `std::bad_alloc` thrown from 
`t->wake_up()` or from the logging/cancel code in this loop: the current task 
is never resubmitted, the dependency is already marked ready, and no later 
`set_ready()` call will revisit the dropped waiters. That leaves the fragment 
hanging instead of cancelling it. The exception handling needs to be per task, 
with fragment cancellation or equivalent unblock logic before continuing to the 
next waiter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to