This is an automated email from the ASF dual-hosted git repository.

vatsrahul1001 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e91517ffd0 Fail closed when supervisor IPC fails on a non-success 
terminal state (#66573)
7e91517ffd0 is described below

commit 7e91517ffd01f0608f21349ad8aa86595617d220
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue May 19 15:40:18 2026 +0200

    Fail closed when supervisor IPC fails on a non-success terminal state 
(#66573)
    
    * Fail closed when supervisor IPC fails on a non-success terminal state
    
    When a task FAILED / SKIPPED / etc. and the IPC send of the
    terminal-state message to the supervisor itself raised, the existing
    finally block logged the exception and let run() return normally.
    The task subprocess then exited with code 0, which the supervisor
    final_state property maps to SUCCESS for an exit_code-0 process
    without a _terminal_state (the supervisor never received the
    message). A genuine task FAILURE was silently being upgraded to
    SUCCESS on transient IPC failures, breaking downstream pipeline
    correctness without any signal.
    
    Exit non-zero from the finally block when the terminal state is
    anything other than SUCCESS, so the supervisor's final_state
    correctly classifies as FAILED (or UP_FOR_RETRY when retries are
    configured). The SUCCESS exemption preserves the existing softening
    for the legitimate scenario where the supervisor rejects the
    terminal-state send with a 409 because the server already
    terminalised the TI -- covered by
    test_run_swallows_supervisor_terminal_send_failure, which continues
    to pass.
    
    New regression test: when the task fails and the supervisor IPC send
    raises (BrokenPipeError simulating a dead Unix socket), run() now
    raises SystemExit(1).
    
    Reported by the L3 ASVS sweep at apache/tooling-agents#24 (FINDING-006).
    
    * Address review comments: defer fail-closed exit, narrow guard
    
    - Narrow the fail-closed guard from `state != SUCCESS` to FAILED /
      UP_FOR_RETRY only (kaxil). SKIPPED / UP_FOR_RESCHEDULE / DEFERRED
      would otherwise be mismapped to FAILED by supervisor's final_state,
      which is strictly worse than the default mapping.
    
    - Stop calling sys.exit(1) inside run()'s finally — SystemExit is a
      BaseException so main()'s `except Exception:` would not catch it
      and finalize() would be skipped, silently dropping
      on_failure_callback / on_retry_callback / listener hooks /
      email_on_failure / email_on_retry on the same IPC failure (kaxil).
      Signal via `_terminal_state_send_failed` on the ti and let main()
      sys.exit(1) after finalize() has run.
    
    - Remove the redundant inline `from ... import run` in the test
      (Lee-W, kaxil) — `run` is already imported at module level.
    
    - Rework the existing regression test to assert the new contract
      (run() returns FAILED + sets the flag) and add a listener-based
      test that locks in callbacks/listeners still firing on the
      IPC-broken path (kaxil).
    
    * Declare _terminal_state_send_failed on RuntimeTaskInstance for mypy
    
    The fail-closed path in _handle_current_task_failure set 
ti._terminal_state_send_failed = True
    dynamically without a class-level declaration, so mypy raised 
[attr-defined].
    Add the field as a Pydantic PrivateAttr (bool, default False) matching the
    existing _cached_template_context pattern. No runtime behavior change —
    getattr(ti, '_terminal_state_send_failed', False) still returns False for
    the unset case, now via the PrivateAttr default instead of the getattr 
fallback.
    
    ---------
    
    Co-authored-by: vatsrahul1001 <[email protected]>
---
 .../src/airflow/sdk/execution_time/task_runner.py  |  28 +++++
 .../task_sdk/execution_time/test_task_runner.py    | 127 +++++++++++++++++++++
 2 files changed, 155 insertions(+)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index 9cb766c9b2d..61e476e60b9 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -183,6 +183,9 @@ class RuntimeTaskInstance(TaskInstance):
     _cached_template_context: Context | None = None
     """The Task Instance context. This is used to cache 
get_template_context."""
 
+    _terminal_state_send_failed: bool = False
+    """True when the supervisor IPC send for a non-success terminal state 
raised; signals main() to sys.exit(1) after finalize() so the supervisor 
doesn't misclassify the run as SUCCESS via exit code 0."""
+
     _ti_context_from_server: Annotated[TIRunContext | None, Field(repr=False)] 
= None
     """The Task Instance context from the API server, if any."""
 
@@ -1429,6 +1432,26 @@ def run(
                     "Failed to report terminal task state to supervisor",
                     state=state.value,
                 )
+                # Fail closed for FAILED / UP_FOR_RETRY: when the supervisor
+                # never receives the terminal-state message, exiting 0 would
+                # let the supervisor's final_state property default to
+                # SUCCESS (exit_code == 0 with no _terminal_state set),
+                # turning a real failure into a silent data-quality bug for
+                # every downstream task. We signal main() to sys.exit(1)
+                # AFTER finalize() runs, so on_failure_callback /
+                # on_retry_callback / listener hooks / email_on_failure /
+                # email_on_retry still fire. sys.exit(1) directly here would
+                # raise SystemExit, which is BaseException, not Exception —
+                # main()'s `except Exception:` would not catch it and
+                # finalize() at the call site would be skipped.
+                #
+                # SKIPPED / UP_FOR_RESCHEDULE / DEFERRED are intentionally
+                # not fail-closed: supervisor's final_state would misclassify
+                # them too, but exiting non-zero would map them to FAILED,
+                # which is strictly worse than the default. Those need a
+                # separate fix in supervisor's final_state.
+                if state in (TaskInstanceState.FAILED, 
TaskInstanceState.UP_FOR_RETRY):
+                    ti._terminal_state_send_failed = True
 
     # Return the message to make unit tests easier too
     ti.state = state
@@ -2030,6 +2053,11 @@ def main():
                 state, _, error = run(ti, context, log)
                 context["exception"] = error
                 finalize(ti, state, context, log, error)
+                # If run() couldn't deliver a FAILED / UP_FOR_RETRY terminal
+                # state to the supervisor, fail closed now — finalize() has
+                # already run, so callbacks and listeners observed the state.
+                if getattr(ti, "_terminal_state_send_failed", False):
+                    sys.exit(1)
         except KeyboardInterrupt:
             log.exception("Ctrl-c hit")
             sys.exit(2)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 54afc412f56..d7b72b2c48f 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -475,6 +475,86 @@ def 
test_run_swallows_supervisor_terminal_send_failure(create_runtime_ti, mock_s
     assert error is None
 
 
+def 
test_run_signals_fail_closed_when_failure_terminal_send_fails(create_runtime_ti,
 mock_supervisor_comms):
+    """
+    When the task FAILS and the terminal-state send to the supervisor fails too
+    (e.g. broken Unix socket / supervisor crashed / IPC channel dead), `run()`
+    must signal to main() that the process should exit non-zero — otherwise
+    the supervisor's `final_state` property defaults exit_code-0-with-no-
+    terminal-state to SUCCESS, turning a transient IPC blip into a silent
+    data-quality bug downstream.
+
+    The signal is deferred to main() (via `_terminal_state_send_failed` on the
+    ti) so finalize() still runs first — on_failure_callback / listener hooks /
+    email_on_failure must observe the FAILED state before the process exits.
+    """
+
+    class FailingOperator(BaseOperator):
+        def execute(self, context):
+            raise RuntimeError("task body failed")
+
+    task = FailingOperator(task_id="failing")
+    runtime_ti = create_runtime_ti(task=task)
+    context = runtime_ti.get_template_context()
+    log = mock.MagicMock()
+
+    # Let the terminal-state send raise an IPC-level failure.
+    def send_side_effect(msg=None, **kwargs):
+        if isinstance(msg, TaskState):
+            raise BrokenPipeError("supervisor IPC broken")
+        return mock.DEFAULT
+
+    mock_supervisor_comms.send.side_effect = send_side_effect
+
+    # run() must not raise — fail-closed is signalled via the ti attribute
+    # so main() can sys.exit(1) only after finalize() has run.
+    state, _, _ = run(runtime_ti, context, log)
+
+    assert state == TaskInstanceState.FAILED
+    assert runtime_ti._terminal_state_send_failed is True
+
+
[email protected](
+    ("state_when_send_fails", "should_fail_closed"),
+    [
+        (TaskInstanceState.SUCCESS, False),
+        (TaskInstanceState.SKIPPED, False),
+    ],
+)
+def test_run_does_not_signal_fail_closed_for_non_failed_states(
+    create_runtime_ti, mock_supervisor_comms, state_when_send_fails, 
should_fail_closed
+):
+    """
+    Only FAILED / UP_FOR_RETRY are fail-closed. SUCCESS is exempt (the existing
+    409-rejection softening). SKIPPED is also exempt: supervisor's final_state
+    misclassifies it either way, and exiting non-zero would map it to FAILED,
+    which is strictly worse than the default mapping.
+    """
+
+    class Op(BaseOperator):
+        def execute(self, context):
+            if state_when_send_fails == TaskInstanceState.SKIPPED:
+                raise AirflowSkipException("skip")
+            return "ok"
+
+    task = Op(task_id="op")
+    runtime_ti = create_runtime_ti(task=task)
+    context = runtime_ti.get_template_context()
+    log = mock.MagicMock()
+
+    def send_side_effect(msg=None, **kwargs):
+        if isinstance(msg, (TaskState, SucceedTask)):
+            raise BrokenPipeError("supervisor IPC broken")
+        return mock.DEFAULT
+
+    mock_supervisor_comms.send.side_effect = send_side_effect
+
+    state, _, _ = run(runtime_ti, context, log)
+
+    assert state == state_when_send_fails
+    assert getattr(runtime_ti, "_terminal_state_send_failed", False) is 
should_fail_closed
+
+
 def test_task_span_is_child_of_dag_run_span(make_ti_context):
     """Full trace hierarchy: dag_run → task_run.my_task (API server) → 
worker.my_task (task runner)."""
     # Single provider shared by all spans so contexts are compatible.
@@ -4076,6 +4156,53 @@ class TestTaskRunnerCallsListeners:
         assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.FAILED]
         assert listener.error == error
 
+    def test_task_runner_calls_listeners_failed_when_terminal_send_fails(
+        self, mocked_parse, mock_supervisor_comms, listener_manager
+    ):
+        """Callbacks/listeners must still fire when the FAILED terminal-state
+        IPC send to the supervisor fails. The fail-closed exit is deferred to
+        main() (signalled via `_terminal_state_send_failed` on the ti) so
+        finalize() runs first.
+        """
+        listener = self.CustomListener()
+        listener_manager(listener)
+
+        class CustomOperator(BaseOperator):
+            def execute(self, context):
+                raise RuntimeError("task body failed")
+
+        task = CustomOperator(task_id="failing_with_broken_ipc")
+        dag = get_inline_dag(dag_id="test_dag", task=task)
+        ti = TaskInstance(
+            id=uuid7(),
+            task_id=task.task_id,
+            dag_id=dag.dag_id,
+            run_id="test_run",
+            try_number=1,
+            dag_version_id=uuid7(),
+        )
+
+        runtime_ti = RuntimeTaskInstance.model_construct(
+            **ti.model_dump(exclude_unset=True), task=task, 
start_date=timezone.utcnow()
+        )
+
+        def send_side_effect(msg=None, **kwargs):
+            if isinstance(msg, TaskState):
+                raise BrokenPipeError("supervisor IPC broken")
+            return mock.DEFAULT
+
+        mock_supervisor_comms.send.side_effect = send_side_effect
+
+        log = mock.MagicMock()
+        context = runtime_ti.get_template_context()
+        state, _, error = run(runtime_ti, context, log)
+        finalize(runtime_ti, state, context, log, error)
+
+        assert state == TaskInstanceState.FAILED
+        assert runtime_ti._terminal_state_send_failed is True
+        assert listener.state == [TaskInstanceState.RUNNING, 
TaskInstanceState.FAILED]
+        assert listener.error == error
+
     def test_task_runner_calls_listeners_skipped(self, mocked_parse, 
mock_supervisor_comms, listener_manager):
         listener = self.CustomListener()
         listener_manager(listener)

Reply via email to