kaxil commented on code in PR #66573:
URL: https://github.com/apache/airflow/pull/66573#discussion_r3262692658
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1426,6 +1426,25 @@ def _on_term(signum, frame):
"Failed to report terminal task state to supervisor",
state=state.value,
)
+ # Fail closed for non-success terminal states: when the
+ # supervisor never receives the terminal-state message,
+ # exiting 0 would let the supervisor's final_state property
+ # default to SUCCESS (exit_code == 0 with no _terminal_state
+ # set). For a task that actually FAILED / was SKIPPED / etc.,
+ # that turns an IPC blip into silent data-quality breakage
+ # for every downstream task. Exit non-zero so the
+ # supervisor's final_state correctly classifies this as
+ # FAILED (or UP_FOR_RETRY when retries are configured).
+ #
+ # SUCCESS is exempt: a "send the SUCCESS marker, supervisor
+ # rejects with 409 because the server already terminalised
+ # this TI" path is the legitimate scenario the existing
+ # softening targets. In that path the local state is SUCCESS
+ # and the supervisor's default-to-SUCCESS coincides with
+ # reality, so we continue to finalize() so listeners observe
+ # the task state.
+ if state != TaskInstanceState.SUCCESS:
+ sys.exit(1)
Review Comment:
`sys.exit(1)` here raises `SystemExit`, which is a `BaseException`, not
`Exception`. The caller in `main()` only catches `except Exception:`
(task_runner.py:2036), so the `finalize(ti, state, context, log, error)` call
right above it (line 2032) is skipped whenever this branch fires. For a FAILED
task, that means `on_failure_callback`, the `on_task_instance_failed` listener
hook, and `email_on_failure` no longer run. For SKIPPED tasks the
`on_skipped_callback` and skipped listener hook are dropped. The
`task.duration` metric and operator extra links are also lost. So this trades
one silent correctness bug (FAILED -> SUCCESS) for another (user
callbacks/listeners/emails silently dropped on the same IPC failure). Could
`main()` catch `SystemExit` to still call `finalize()` before re-raising, or
could `run()` set a flag that `main()` uses to pick the exit code after
`finalize()` returns?
##########
task-sdk/src/airflow/sdk/execution_time/task_runner.py:
##########
@@ -1426,6 +1426,25 @@ def _on_term(signum, frame):
"Failed to report terminal task state to supervisor",
state=state.value,
)
+ # Fail closed for non-success terminal states: when the
+ # supervisor never receives the terminal-state message,
+ # exiting 0 would let the supervisor's final_state property
+ # default to SUCCESS (exit_code == 0 with no _terminal_state
+ # set). For a task that actually FAILED / was SKIPPED / etc.,
+ # that turns an IPC blip into silent data-quality breakage
+ # for every downstream task. Exit non-zero so the
+ # supervisor's final_state correctly classifies this as
+ # FAILED (or UP_FOR_RETRY when retries are configured).
+ #
+ # SUCCESS is exempt: a "send the SUCCESS marker, supervisor
+ # rejects with 409 because the server already terminalised
+ # this TI" path is the legitimate scenario the existing
+ # softening targets. In that path the local state is SUCCESS
+ # and the supervisor's default-to-SUCCESS coincides with
+ # reality, so we continue to finalize() so listeners observe
+ # the task state.
+ if state != TaskInstanceState.SUCCESS:
Review Comment:
The supervisor's `final_state` (supervisor.py:1398) maps any non-zero exit
with no `_terminal_state` to FAILED (or UP_FOR_RETRY with retries). That's
right for a FAILED task, but this branch also fires for SKIPPED,
UP_FOR_RESCHEDULE, and DEFERRED. A skipped task whose IPC fails will now be
recorded as FAILED, a reschedule-mode sensor becomes FAILED instead of waiting,
and a deferred task becomes FAILED instead of starting its trigger. Is the
right guard `state in {FAILED, UP_FOR_RETRY}` rather than `state != SUCCESS`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]