github-actions[bot] commented on code in PR #61679:
URL: https://github.com/apache/doris/pull/61679#discussion_r2980699413
##########
be/src/exec/pipeline/pipeline_task.cpp:
##########
@@ -473,15 +473,41 @@ Status PipelineTask::execute(bool* done) {
// If task is woke up early, we should terminate all operators, and
this task could be closed immediately.
if (_wake_up_early) {
- terminate();
- THROW_IF_ERROR(_root->terminate(_state));
- THROW_IF_ERROR(_sink->terminate(_state));
_eos = true;
*done = true;
} else if (_eos && !_spilling &&
(fragment_context->is_canceled() || !_is_pending_finish()))
{
+ // Debug point for testing the race condition fix: inject
set_wake_up_early() +
+ // terminate() here to simulate Thread B writing A then B between
Thread A's two
+ // reads of _wake_up_early.
+ DBUG_EXECUTE_IF("PipelineTask::execute.wake_up_early_in_else_if", {
+ set_wake_up_early();
+ terminate();
+ });
*done = true;
}
+
+ // NOTE: The terminate() call is intentionally placed AFTER the
_is_pending_finish() check
+ // above, not before. This ordering is critical to avoid a race
condition:
+ //
+ // Pipeline::make_all_runnable() writes in this order:
+ // (A) set_wake_up_early() -> (B) terminate() [sets
finish_dep._always_ready]
+ //
+ // If we checked _wake_up_early (A) before _is_pending_finish() (B),
there would be a
+ // window where Thread A reads _wake_up_early=false, then Thread B
writes both A and B,
+ // then Thread A reads _is_pending_finish()=false (due to
_always_ready). Thread A would
+ // then set *done=true without ever calling operator terminate(),
causing close() to run
+ // on operators that were never properly terminated (e.g.
RuntimeFilterProducer still in
+ // WAITING_FOR_SYNCED_SIZE state when insert() is called).
+ //
+ // By reading _is_pending_finish() (B) before the second read of
_wake_up_early (A),
+ // if Thread A observes B's effect (_always_ready=true), it is
guaranteed to also observe
+ // A's effect (_wake_up_early=true) on this second read, ensuring
terminate() is called.
+ if (_wake_up_early) {
+ terminate();
+ THROW_IF_ERROR(_root->terminate(_state));
Review Comment:
**Advisory (pre-existing):** Per `be/src/common/AGENTS.md`, `THROW_IF_ERROR`
should be kept out of `Defer` blocks and destructors — the recommendation is to
use `WARN_IF_ERROR` instead. While the Doris `Defer` implementation handles
this safely (it checks `std::uncaught_exceptions()` and swallows exceptions
during stack unwinding), during unwinding any error from `_root->terminate()`
or `_sink->terminate()` would be silently lost.
This is a pre-existing pattern (the original code had the same calls in the
same Defer), so it's not a blocker for this PR. But since the code is being
moved, this would be a good opportunity to switch to `WARN_IF_ERROR` if
operator terminate failures are truly best-effort cleanup.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]