This is an automated email from the ASF dual-hosted git repository.
jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 34d6f4c33f8 fix/issue-66592: Respecting unwrap_single for
non-deferrable execution (#66596)
34d6f4c33f8 is described below
commit 34d6f4c33f83bfe0ec13344baf472cbb391c91dc
Author: Jake McGrath <[email protected]>
AuthorDate: Sat May 9 12:40:14 2026 -0400
fix/issue-66592: Respecting unwrap_single for non-deferrable execution
(#66596)
---
.../providers/cncf/kubernetes/operators/job.py | 2 +-
.../unit/cncf/kubernetes/operators/test_job.py | 42 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 1 deletion(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
index 49eef06c33c..510346d34ad 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py
@@ -255,7 +255,7 @@ class KubernetesJobOperator(KubernetesPodOperator):
f"Kubernetes job '{self.job.metadata.name}' is failed with
error '{error_message}'"
)
if self.do_xcom_push:
- return xcom_result
+ return xcom_result[0] if self.unwrap_single and
len(xcom_result) == 1 else xcom_result
def execute_deferrable(self):
self.defer(
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
index 4e1bd0c0eb0..6bde7c4772d 100644
--- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
+++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_job.py
@@ -1119,6 +1119,48 @@ class TestKubernetesJobOperator:
assert result == return_value
assert mock_client.return_value.list_namespaced_pod.call_count ==
successful_try + 1
+ @pytest.mark.non_db_test_override
+ @pytest.mark.parametrize(
+ ("unwrap_single", "parallelism", "expected"),
+ [
+ (True, 1, "xcom-result"),
+ (True, 2, ["xcom-result", "xcom-result"]),
+ (False, 1, ["xcom-result"]),
+ ],
+ )
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.extract_xcom"))
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"),
mock.MagicMock())
+ @patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.create_job"),
mock.MagicMock())
+ @patch(f"{POD_MANAGER_CLASS}.await_xcom_sidecar_container_start",
mock.MagicMock())
+ @patch(f"{POD_MANAGER_CLASS}.await_container_completion", mock.MagicMock())
+ @patch(HOOK_CLASS)
+ def test_execute_xcom_respects_unwrap_single(
+ self,
+ mock_hook,
+ mock_get_pods,
+ mock_extract_xcom,
+ unwrap_single,
+ parallelism,
+ expected,
+ ):
+ mock_hook.return_value.is_job_failed.return_value = None
+ mock_extract_xcom.return_value = "xcom-result"
+ mock_get_pods.return_value = [mock.MagicMock() for _ in
range(parallelism)]
+
+ op = KubernetesJobOperator(
+ task_id="test_task_id",
+ wait_until_job_complete=True,
+ do_xcom_push=True,
+ get_logs=False,
+ parallelism=parallelism,
+ unwrap_single=unwrap_single,
+ )
+
+ result = op.execute(context=dict(ti=mock.MagicMock()))
+
+ assert result == expected
+
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.get_pods"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.build_job_request_obj"))
@patch(JOB_OPERATORS_PATH.format("KubernetesJobOperator.hook"),
new_callable=mock.PropertyMock)