This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ca05a56f6a1 Fix multiple_outputs no-op on deferrable 
KubernetesPodOperator (#67226)
ca05a56f6a1 is described below

commit ca05a56f6a1d92de8033254d0e65460a01af9606
Author: Paul Mathew <[email protected]>
AuthorDate: Thu May 21 17:53:50 2026 -0400

    Fix multiple_outputs no-op on deferrable KubernetesPodOperator (#67226)
    
    * Fix multiple_outputs no-op on deferrable KubernetesPodOperator
    
    KubernetesPodOperator(do_xcom_push=True, multiple_outputs=True,
    deferrable=True) silently failed to fan out the sidecar's return.json dict
    into per-key XComs — only `return_value` was published. Downstream tasks
    subscripting a key (operator.output["foo"] resolving to 
xcom_pull(key="foo"))
    got None at runtime with no error.
    
    Root cause: trigger_reentry pushed return_value manually inside a finally
    block and never returned the value to the task runner, so the runner's
    _push_xcom_if_needed (the code that honors multiple_outputs and fans the
    dict out) was bypassed. The sync execute_sync path already returns the
    result for the runner to handle (pod.py:760); this aligns trigger_reentry
    with that same contract.
    
    The failure-path manual push is preserved by moving it inside the
    event["status"] != "success" branch above the raise — partial sidecar
    output is still surfaced in XCom when the pod fails, and the behavior is
    now strictly better: the push happens even when the subsequent _clean
    call raises (previously the in-finally push was unreachable in that case).
    
    Fixes #67224
    
    Co-authored-by: Cursor <[email protected]>
    
    * Trigger CI rerun after upload-artifact 403 flake
    
    Co-authored-by: Cursor <[email protected]>
    
    * Apply multiple_outputs fan-out to KPO failure paths consistently
    
    Address review on #67226: the previous commit fixed the success path on
    deferrable KPO but left both sync and async failure paths still pushing
    only ``return_value``, silently dropping ``multiple_outputs`` fan-out.
    
    Add an operator-local ``_push_xcom_with_fan_out`` helper that mirrors the
    task runner's ``_push_xcom_if_needed`` and call it from both failure-path
    manual pushes (sync ``post_complete_action`` and async ``trigger_reentry``).
    All four code paths — sync success/failure, async success/failure — now
    honour ``multiple_outputs`` consistently.
    
    Promoting ``_push_xcom_if_needed`` to a public task-SDK helper would let
    other operators that manually push XCom on failure paths reuse the same
    logic and would be a cleaner long-term fix. Left as a follow-up since
    manual ``XCOM_RETURN_KEY`` pushes are essentially KPO-specific today
    (grep across all providers turns up only one other operator) and the
    local helper keeps this PR's blast radius matched to its scope.
    
    Tests:
    - New ``test_xcom_push_failed_pod_fans_out_for_multiple_outputs`` covers
      the sync failure path.
    - New ``test_async_trigger_reentry_failure_fans_out_for_multiple_outputs``
      covers the async failure path.
    
    Also incorporate review nits: drop unnecessary comments, switch ``is`` to
    ``==`` for dict equality checks, and trim the historical context from the
    new test docstring (moved to the PR description).
    
    Co-authored-by: Cursor <[email protected]>
    
    * Use explicit ``is None`` guard in ``_push_xcom_with_fan_out``
    
    Address review from @jscheffl on #67226: the previous ``if not value:``
    guard would skip the manual XCom push for any falsy-but-non-None payload
    (``False``, ``0``, ``""``, ``[]``, ``{}``) — all of which are legitimate
    sidecar outputs that downstream tasks may rely on. Switch to ``is None``
    so only the "nothing extracted" case is skipped, matching the task
    runner's ``_push_xcom_if_needed`` semantics (which also guards on
    ``xcom_value is None``).
    
    Also fix D213 docstring style flagged by ruff (multi-line summary should
    start on the second line).
    
    Co-authored-by: Cursor <[email protected]>
    
    ---------
    
    Co-authored-by: Cursor <[email protected]>
---
 .../providers/cncf/kubernetes/operators/pod.py     |  38 +++++++-
 .../unit/cncf/kubernetes/operators/test_pod.py     | 108 +++++++++++++++++++--
 2 files changed, 133 insertions(+), 13 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index c7c1715700f..6e935290b53 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -28,7 +28,7 @@ import os
 import re
 import shlex
 import string
-from collections.abc import Callable, Container, Iterable, Sequence
+from collections.abc import Callable, Container, Iterable, Mapping, Sequence
 from contextlib import AbstractContextManager, suppress
 from enum import Enum
 from functools import cached_property
@@ -1074,12 +1074,14 @@ class KubernetesPodOperator(BaseOperator):
                         "Trigger emitted an %s event, failing the task: %s", 
event["status"], event["message"]
                     )
                     message = event.get("stack_trace", event["message"])
+                    if self.do_xcom_push:
+                        self._push_xcom_with_fan_out(context["ti"], 
xcom_sidecar_output)
                     raise AirflowException(message)
         finally:
             self._clean(event=event, context=context, 
result=xcom_sidecar_output)
 
-            if self.do_xcom_push and xcom_sidecar_output:
-                context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)
+        if self.do_xcom_push:
+            return xcom_sidecar_output
 
     def _clean(self, event: dict[str, Any], result: dict | None, context: 
Context) -> None:
         if self.pod is None:
@@ -1109,6 +1111,32 @@ class KubernetesPodOperator(BaseOperator):
                 result=result,
             )
 
+    def _push_xcom_with_fan_out(self, ti: Any, value: Any) -> None:
+        """
+        Push ``return_value`` and, when ``multiple_outputs`` is set, also fan 
a dict out per key.
+
+        Mirrors the task runner's ``_push_xcom_if_needed`` so the failure-path 
manual pushes
+        in ``cleanup`` (sync) and ``trigger_reentry`` (async) honour 
``multiple_outputs`` —
+        previously they pushed only ``return_value``, silently dropping 
per-key fan-out.
+        On success both paths return the value and let the runner perform the 
push instead.
+        """
+        if value is None:
+            return
+        if self.multiple_outputs:
+            if not isinstance(value, Mapping):
+                raise TypeError(
+                    f"Returned output was type {type(value)} expected 
dictionary for multiple_outputs"
+                )
+            for key in value:
+                if not isinstance(key, str):
+                    raise TypeError(
+                        "Returned dictionary keys must be strings when using "
+                        f"multiple_outputs, found {key} ({type(key)}) instead"
+                    )
+            for k, v in value.items():
+                ti.xcom_push(k, v)
+        ti.xcom_push(XCOM_RETURN_KEY, value)
+
     @tenacity.retry(
         stop=tenacity.stop_after_attempt(3),
         wait=tenacity.wait_exponential(max=15),
@@ -1190,9 +1218,9 @@ class KubernetesPodOperator(BaseOperator):
             failed = pod_phase != PodPhase.SUCCEEDED
 
         if failed:
-            if self.do_xcom_push and xcom_result and context:
+            if self.do_xcom_push and context:
                 # Ensure that existing XCom is pushed even in case of failure
-                context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_result)
+                self._push_xcom_with_fan_out(context["ti"], xcom_result)
 
             if self.log_events_on_failure:
                 self._read_pod_container_states(pod, reraise=False)
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 4141c5c594f..8f130788107 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -17,6 +17,7 @@
 from __future__ import annotations
 
 import datetime
+import json
 import re
 from contextlib import contextmanager, nullcontext
 from typing import TYPE_CHECKING
@@ -1883,6 +1884,33 @@ class TestKubernetesPodOperator:
 
         context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, {"Test 
key": "Test value"})
 
+    @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+    @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+    @patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+    def test_xcom_push_failed_pod_fans_out_for_multiple_outputs(
+        self, remote_pod, mock_await, mock_extract_xcom
+    ):
+        """On the sync failure path, ``multiple_outputs=True`` must fan the 
sidecar dict out
+        into per-key XComs in addition to the ``return_value`` push — matching 
the task runner
+        behaviour applied on the success path.
+        """
+        k = KubernetesPodOperator(
+            task_id="task", on_finish_action="delete_pod", do_xcom_push=True, 
multiple_outputs=True
+        )
+
+        remote_pod.return_value.status.phase = "Failed"
+        sidecar_output = {"export_arn": "arn:aws:dynamodb:::export/x", 
"s3_uri": "s3://b/p"}
+        mock_extract_xcom.return_value = json.dumps(sidecar_output)
+        context = create_context(k)
+        context["ti"].xcom_push = MagicMock()
+
+        with pytest.raises(AirflowException):
+            k.execute(context=context)
+
+        context["ti"].xcom_push.assert_any_call("export_arn", 
"arn:aws:dynamodb:::export/x")
+        context["ti"].xcom_push.assert_any_call("s3_uri", "s3://b/p")
+        context["ti"].xcom_push.assert_any_call(XCOM_RETURN_KEY, 
sidecar_output)
+
     @pytest.mark.asyncio
     @pytest.mark.parametrize(
         ("kwargs", "actual_exit_code", "expected_exc"),
@@ -3451,13 +3479,14 @@ def 
test_async_kpo_wait_termination_before_cleanup_on_success(
     # check if it gets the pod
     mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME, 
TEST_NAMESPACE)
 
-    # assert that the xcom are extracted/not extracted
     if do_xcom_push:
         mock_extract_xcom.assert_called_once()
-        context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, 
mock_extract_xcom.return_value)
+        assert result == mock_extract_xcom.return_value
+        context["ti"].xcom_push.assert_not_called()
     else:
         mock_extract_xcom.assert_not_called()
         assert result is None
+        context["ti"].xcom_push.assert_not_called()
 
     # check if it waits for the pod to complete
     assert read_pod_mock.call_count == 3
@@ -3498,16 +3527,14 @@ def 
test_async_kpo_wait_termination_before_cleanup_on_failure(
     # check if it gets the pod
     mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME, 
TEST_NAMESPACE)
 
-    # assert that it does not push the xcom
-    ti_mock.xcom_push.assert_not_called()
-
+    # The failure-path push happens before the ``finally``-block ``_clean`` 
runs,
+    # so even a failing cleanup (simulated here via side_effect) doesn't 
suppress it.
     if do_xcom_push:
-        # assert that the xcom are not extracted if do_xcom_push is False
         mock_extract_xcom.assert_called_once()
+        ti_mock.xcom_push.assert_called_once_with(XCOM_RETURN_KEY, 
mock_extract_xcom.return_value)
     else:
-        # but that it is extracted when do_xcom_push is true because the 
sidecare
-        # needs to be terminated
         mock_extract_xcom.assert_not_called()
+        ti_mock.xcom_push.assert_not_called()
 
     # check if it waits for the pod to complete
     assert read_pod_mock.call_count == 3
@@ -3516,6 +3543,71 @@ def 
test_async_kpo_wait_termination_before_cleanup_on_failure(
     post_complete_action.assert_called_once()
 
 
+@patch(KUB_OP_PATH.format("extract_xcom"))
+@patch(KUB_OP_PATH.format("post_complete_action"))
+@patch(HOOK_CLASS)
+def test_async_trigger_reentry_returns_sidecar_output_for_multiple_outputs(
+    mocked_hook, post_complete_action, mock_extract_xcom
+):
+    """On the success path with ``multiple_outputs=True``, ``trigger_reentry`` 
returns the
+    sidecar dict and does not push ``return_value`` itself — the task runner's
+    ``_push_xcom_if_needed`` handles both the ``return_value`` push and the 
per-key fan-out.
+    """
+    metadata = {"metadata.name": TEST_NAME, "metadata.namespace": 
TEST_NAMESPACE}
+    succeeded_state = mock.MagicMock(**metadata, **{"status.phase": 
"Succeeded"})
+    mocked_hook.return_value.get_pod.return_value = succeeded_state
+    mocked_hook.return_value.core_v1_client.read_namespaced_pod.return_value = 
succeeded_state
+
+    sidecar_output = {"export_arn": "arn:aws:dynamodb:::export/x", "s3_uri": 
"s3://b/p"}
+    mock_extract_xcom.return_value = sidecar_output
+
+    k = KubernetesPodOperator(task_id="task", deferrable=True, 
do_xcom_push=True, multiple_outputs=True)
+    context = create_context(k)
+    context["ti"].xcom_push = MagicMock()
+
+    success_event = {
+        "status": "success",
+        "message": TEST_SUCCESS_MESSAGE,
+        "name": TEST_NAME,
+        "namespace": TEST_NAMESPACE,
+    }
+
+    result = k.trigger_reentry(context, success_event)
+
+    assert result == sidecar_output
+    context["ti"].xcom_push.assert_not_called()
+
+
+@patch(KUB_OP_PATH.format("extract_xcom"))
+@patch(KUB_OP_PATH.format("post_complete_action"))
+@patch(HOOK_CLASS)
+def test_async_trigger_reentry_failure_fans_out_for_multiple_outputs(
+    mocked_hook, post_complete_action, mock_extract_xcom
+):
+    """On the async failure path with ``multiple_outputs=True``, 
``trigger_reentry`` must fan
+    the partial sidecar dict out into per-key XComs before raising — matching 
the sync failure
+    path so behaviour is consistent regardless of execution mode.
+    """
+    metadata = {"metadata.name": TEST_NAME, "metadata.namespace": 
TEST_NAMESPACE}
+    failed_state = mock.MagicMock(**metadata, **{"status.phase": "Failed"})
+    mocked_hook.return_value.get_pod.return_value = failed_state
+    mocked_hook.return_value.core_v1_client.read_namespaced_pod.return_value = 
failed_state
+
+    sidecar_output = {"export_arn": "arn:aws:dynamodb:::export/x", "s3_uri": 
"s3://b/p"}
+    mock_extract_xcom.return_value = sidecar_output
+
+    k = KubernetesPodOperator(task_id="task", deferrable=True, 
do_xcom_push=True, multiple_outputs=True)
+    ti_mock = MagicMock()
+    failure_event = {"status": "failed", "message": "error", "name": 
TEST_NAME, "namespace": TEST_NAMESPACE}
+
+    with pytest.raises(AirflowException):
+        k.trigger_reentry({"ti": ti_mock}, failure_event)
+
+    ti_mock.xcom_push.assert_any_call("export_arn", 
"arn:aws:dynamodb:::export/x")
+    ti_mock.xcom_push.assert_any_call("s3_uri", "s3://b/p")
+    ti_mock.xcom_push.assert_any_call(XCOM_RETURN_KEY, sidecar_output)
+
+
 def test_default_container_logs():
     class TestSubclassKPO(KubernetesPodOperator):
         BASE_CONTAINER_NAME = "test-base-container"

Reply via email to