This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new ca05a56f6a1 Fix multiple_outputs no-op on deferrable
KubernetesPodOperator (#67226)
ca05a56f6a1 is described below
commit ca05a56f6a1d92de8033254d0e65460a01af9606
Author: Paul Mathew <[email protected]>
AuthorDate: Thu May 21 17:53:50 2026 -0400
Fix multiple_outputs no-op on deferrable KubernetesPodOperator (#67226)
* Fix multiple_outputs no-op on deferrable KubernetesPodOperator
KubernetesPodOperator(do_xcom_push=True, multiple_outputs=True,
deferrable=True) silently failed to fan out the sidecar's return.json dict
into per-key XComs — only `return_value` was published. Downstream tasks
subscripting a key (operator.output["foo"] resolving to
xcom_pull(key="foo"))
got None at runtime with no error.
Root cause: trigger_reentry pushed return_value manually inside a finally
block and never returned the value to the task runner, so the runner's
_push_xcom_if_needed (the code that honors multiple_outputs and fans the
dict out) was bypassed. The sync execute_sync path already returns the
result for the runner to handle (pod.py:760); this aligns trigger_reentry
with that same contract.
The failure-path manual push is preserved by moving it inside the
event["status"] != "success" branch above the raise — partial sidecar
output is still surfaced in XCom when the pod fails, and the behavior is
now strictly better: the push happens even when the subsequent _clean
call raises (previously the in-finally push was unreachable in that case).
Fixes #67224
Co-authored-by: Cursor <[email protected]>
* Trigger CI rerun after upload-artifact 403 flake
Co-authored-by: Cursor <[email protected]>
* Apply multiple_outputs fan-out to KPO failure paths consistently
Address review on #67226: the previous commit fixed the success path on
deferrable KPO but left both sync and async failure paths still pushing
only ``return_value``, silently dropping ``multiple_outputs`` fan-out.
Add an operator-local ``_push_xcom_with_fan_out`` helper that mirrors the
task runner's ``_push_xcom_if_needed`` and call it from both failure-path
manual pushes (sync ``post_complete_action`` and async ``trigger_reentry``).
All four code paths — sync success/failure, async success/failure — now
honour ``multiple_outputs`` consistently.
Promoting ``_push_xcom_if_needed`` to a public task-SDK helper would let
other operators that manually push XCom on failure paths reuse the same
logic and would be a cleaner long-term fix. Left as a follow-up since
manual ``XCOM_RETURN_KEY`` pushes are essentially KPO-specific today
(grep across all providers turns up only one other operator) and the
local helper keeps this PR's blast radius matched to its scope.
Tests:
- New ``test_xcom_push_failed_pod_fans_out_for_multiple_outputs`` covers
the sync failure path.
- New ``test_async_trigger_reentry_failure_fans_out_for_multiple_outputs``
covers the async failure path.
Also incorporate review nits: drop unnecessary comments, switch ``is`` to
``==`` for dict equality checks, and trim the historical context from the
new test docstring (moved to the PR description).
Co-authored-by: Cursor <[email protected]>
* Use explicit ``is None`` guard in ``_push_xcom_with_fan_out``
Address review from @jscheffl on #67226: the previous ``if not value:``
guard would skip the manual XCom push for any falsy-but-non-None payload
(``False``, ``0``, ``""``, ``[]``, ``{}``) — all of which are legitimate
sidecar outputs that downstream tasks may rely on. Switch to ``is None``
so only the "nothing extracted" case is skipped, matching the task
runner's ``_push_xcom_if_needed`` semantics (which also guards on
``xcom_value is None``).
Also fix D213 docstring style flagged by ruff (multi-line summary should
start on the second line).
Co-authored-by: Cursor <[email protected]>
---------
Co-authored-by: Cursor <[email protected]>
---
.../providers/cncf/kubernetes/operators/pod.py | 38 +++++++-
.../unit/cncf/kubernetes/operators/test_pod.py | 108 +++++++++++++++++++--
2 files changed, 133 insertions(+), 13 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
index c7c1715700f..6e935290b53 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -28,7 +28,7 @@ import os
import re
import shlex
import string
-from collections.abc import Callable, Container, Iterable, Sequence
+from collections.abc import Callable, Container, Iterable, Mapping, Sequence
from contextlib import AbstractContextManager, suppress
from enum import Enum
from functools import cached_property
@@ -1074,12 +1074,14 @@ class KubernetesPodOperator(BaseOperator):
"Trigger emitted an %s event, failing the task: %s",
event["status"], event["message"]
)
message = event.get("stack_trace", event["message"])
+ if self.do_xcom_push:
+ self._push_xcom_with_fan_out(context["ti"],
xcom_sidecar_output)
raise AirflowException(message)
finally:
self._clean(event=event, context=context,
result=xcom_sidecar_output)
- if self.do_xcom_push and xcom_sidecar_output:
- context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_sidecar_output)
+ if self.do_xcom_push:
+ return xcom_sidecar_output
def _clean(self, event: dict[str, Any], result: dict | None, context:
Context) -> None:
if self.pod is None:
@@ -1109,6 +1111,32 @@ class KubernetesPodOperator(BaseOperator):
result=result,
)
+ def _push_xcom_with_fan_out(self, ti: Any, value: Any) -> None:
+ """
+ Push ``return_value`` and, when ``multiple_outputs`` is set, also fan
a dict out per key.
+
+ Mirrors the task runner's ``_push_xcom_if_needed`` so the failure-path
manual pushes
+ in ``cleanup`` (sync) and ``trigger_reentry`` (async) honour
``multiple_outputs`` —
+ previously they pushed only ``return_value``, silently dropping
per-key fan-out.
+ On success both paths return the value and let the runner perform the
push instead.
+ """
+ if value is None:
+ return
+ if self.multiple_outputs:
+ if not isinstance(value, Mapping):
+ raise TypeError(
+ f"Returned output was type {type(value)} expected
dictionary for multiple_outputs"
+ )
+ for key in value:
+ if not isinstance(key, str):
+ raise TypeError(
+ "Returned dictionary keys must be strings when using "
+ f"multiple_outputs, found {key} ({type(key)}) instead"
+ )
+ for k, v in value.items():
+ ti.xcom_push(k, v)
+ ti.xcom_push(XCOM_RETURN_KEY, value)
+
@tenacity.retry(
stop=tenacity.stop_after_attempt(3),
wait=tenacity.wait_exponential(max=15),
@@ -1190,9 +1218,9 @@ class KubernetesPodOperator(BaseOperator):
failed = pod_phase != PodPhase.SUCCEEDED
if failed:
- if self.do_xcom_push and xcom_result and context:
+ if self.do_xcom_push and context:
# Ensure that existing XCom is pushed even in case of failure
- context["ti"].xcom_push(XCOM_RETURN_KEY, xcom_result)
+ self._push_xcom_with_fan_out(context["ti"], xcom_result)
if self.log_events_on_failure:
self._read_pod_container_states(pod, reraise=False)
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
index 4141c5c594f..8f130788107 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py
@@ -17,6 +17,7 @@
from __future__ import annotations
import datetime
+import json
import re
from contextlib import contextmanager, nullcontext
from typing import TYPE_CHECKING
@@ -1883,6 +1884,33 @@ class TestKubernetesPodOperator:
context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY, {"Test
key": "Test value"})
+ @patch(f"{POD_MANAGER_CLASS}.extract_xcom")
+ @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start")
+ @patch(f"{POD_MANAGER_CLASS}.await_pod_completion")
+ def test_xcom_push_failed_pod_fans_out_for_multiple_outputs(
+ self, remote_pod, mock_await, mock_extract_xcom
+ ):
+ """On the sync failure path, ``multiple_outputs=True`` must fan the
sidecar dict out
+ into per-key XComs in addition to the ``return_value`` push — matching
the task runner
+ behaviour applied on the success path.
+ """
+ k = KubernetesPodOperator(
+ task_id="task", on_finish_action="delete_pod", do_xcom_push=True,
multiple_outputs=True
+ )
+
+ remote_pod.return_value.status.phase = "Failed"
+ sidecar_output = {"export_arn": "arn:aws:dynamodb:::export/x",
"s3_uri": "s3://b/p"}
+ mock_extract_xcom.return_value = json.dumps(sidecar_output)
+ context = create_context(k)
+ context["ti"].xcom_push = MagicMock()
+
+ with pytest.raises(AirflowException):
+ k.execute(context=context)
+
+ context["ti"].xcom_push.assert_any_call("export_arn",
"arn:aws:dynamodb:::export/x")
+ context["ti"].xcom_push.assert_any_call("s3_uri", "s3://b/p")
+ context["ti"].xcom_push.assert_any_call(XCOM_RETURN_KEY,
sidecar_output)
+
@pytest.mark.asyncio
@pytest.mark.parametrize(
("kwargs", "actual_exit_code", "expected_exc"),
@@ -3451,13 +3479,14 @@ def
test_async_kpo_wait_termination_before_cleanup_on_success(
# check if it gets the pod
mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME,
TEST_NAMESPACE)
- # assert that the xcom are extracted/not extracted
if do_xcom_push:
mock_extract_xcom.assert_called_once()
- context["ti"].xcom_push.assert_called_with(XCOM_RETURN_KEY,
mock_extract_xcom.return_value)
+ assert result == mock_extract_xcom.return_value
+ context["ti"].xcom_push.assert_not_called()
else:
mock_extract_xcom.assert_not_called()
assert result is None
+ context["ti"].xcom_push.assert_not_called()
# check if it waits for the pod to complete
assert read_pod_mock.call_count == 3
@@ -3498,16 +3527,14 @@ def
test_async_kpo_wait_termination_before_cleanup_on_failure(
# check if it gets the pod
mocked_hook.return_value.get_pod.assert_called_once_with(TEST_NAME,
TEST_NAMESPACE)
- # assert that it does not push the xcom
- ti_mock.xcom_push.assert_not_called()
-
+ # The failure-path push happens before the ``finally``-block ``_clean``
runs,
+ # so even a failing cleanup (simulated here via side_effect) doesn't
suppress it.
if do_xcom_push:
- # assert that the xcom are not extracted if do_xcom_push is False
mock_extract_xcom.assert_called_once()
+ ti_mock.xcom_push.assert_called_once_with(XCOM_RETURN_KEY,
mock_extract_xcom.return_value)
else:
- # but that it is extracted when do_xcom_push is true because the
sidecare
- # needs to be terminated
mock_extract_xcom.assert_not_called()
+ ti_mock.xcom_push.assert_not_called()
# check if it waits for the pod to complete
assert read_pod_mock.call_count == 3
@@ -3516,6 +3543,71 @@ def
test_async_kpo_wait_termination_before_cleanup_on_failure(
post_complete_action.assert_called_once()
+@patch(KUB_OP_PATH.format("extract_xcom"))
+@patch(KUB_OP_PATH.format("post_complete_action"))
+@patch(HOOK_CLASS)
+def test_async_trigger_reentry_returns_sidecar_output_for_multiple_outputs(
+ mocked_hook, post_complete_action, mock_extract_xcom
+):
+ """On the success path with ``multiple_outputs=True``, ``trigger_reentry``
returns the
+ sidecar dict and does not push ``return_value`` itself — the task runner's
+ ``_push_xcom_if_needed`` handles both the ``return_value`` push and the
per-key fan-out.
+ """
+ metadata = {"metadata.name": TEST_NAME, "metadata.namespace":
TEST_NAMESPACE}
+ succeeded_state = mock.MagicMock(**metadata, **{"status.phase":
"Succeeded"})
+ mocked_hook.return_value.get_pod.return_value = succeeded_state
+ mocked_hook.return_value.core_v1_client.read_namespaced_pod.return_value =
succeeded_state
+
+ sidecar_output = {"export_arn": "arn:aws:dynamodb:::export/x", "s3_uri":
"s3://b/p"}
+ mock_extract_xcom.return_value = sidecar_output
+
+ k = KubernetesPodOperator(task_id="task", deferrable=True,
do_xcom_push=True, multiple_outputs=True)
+ context = create_context(k)
+ context["ti"].xcom_push = MagicMock()
+
+ success_event = {
+ "status": "success",
+ "message": TEST_SUCCESS_MESSAGE,
+ "name": TEST_NAME,
+ "namespace": TEST_NAMESPACE,
+ }
+
+ result = k.trigger_reentry(context, success_event)
+
+ assert result == sidecar_output
+ context["ti"].xcom_push.assert_not_called()
+
+
+@patch(KUB_OP_PATH.format("extract_xcom"))
+@patch(KUB_OP_PATH.format("post_complete_action"))
+@patch(HOOK_CLASS)
+def test_async_trigger_reentry_failure_fans_out_for_multiple_outputs(
+ mocked_hook, post_complete_action, mock_extract_xcom
+):
+ """On the async failure path with ``multiple_outputs=True``,
``trigger_reentry`` must fan
+ the partial sidecar dict out into per-key XComs before raising — matching
the sync failure
+ path so behaviour is consistent regardless of execution mode.
+ """
+ metadata = {"metadata.name": TEST_NAME, "metadata.namespace":
TEST_NAMESPACE}
+ failed_state = mock.MagicMock(**metadata, **{"status.phase": "Failed"})
+ mocked_hook.return_value.get_pod.return_value = failed_state
+ mocked_hook.return_value.core_v1_client.read_namespaced_pod.return_value =
failed_state
+
+ sidecar_output = {"export_arn": "arn:aws:dynamodb:::export/x", "s3_uri":
"s3://b/p"}
+ mock_extract_xcom.return_value = sidecar_output
+
+ k = KubernetesPodOperator(task_id="task", deferrable=True,
do_xcom_push=True, multiple_outputs=True)
+ ti_mock = MagicMock()
+ failure_event = {"status": "failed", "message": "error", "name":
TEST_NAME, "namespace": TEST_NAMESPACE}
+
+ with pytest.raises(AirflowException):
+ k.trigger_reentry({"ti": ti_mock}, failure_event)
+
+ ti_mock.xcom_push.assert_any_call("export_arn",
"arn:aws:dynamodb:::export/x")
+ ti_mock.xcom_push.assert_any_call("s3_uri", "s3://b/p")
+ ti_mock.xcom_push.assert_any_call(XCOM_RETURN_KEY, sidecar_output)
+
+
def test_default_container_logs():
class TestSubclassKPO(KubernetesPodOperator):
BASE_CONTAINER_NAME = "test-base-container"