This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3b6a24bba11 Fix XCom directory creation logic in Kubernetes decorator
(#56545) (#59347)
3b6a24bba11 is described below
commit 3b6a24bba116551665572f4eea3b0cebc44bd901
Author: Aaron Chen <[email protected]>
AuthorDate: Sat Dec 13 23:39:27 2025 +0800
Fix XCom directory creation logic in Kubernetes decorator (#56545) (#59347)
* Fix XCom directory creation logic in Kubernetes decorator and add tests
for do_xcom_push behavior (#56545)
* fix CI test error
---
.../cncf/kubernetes/decorators/kubernetes.py | 9 ++++-
.../cncf/kubernetes/decorators/test_kubernetes.py | 47 ++++++++++++++++++++++
2 files changed, 54 insertions(+), 2 deletions(-)
diff --git
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
index 6d18732264f..f1a2fa4395e 100644
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
@@ -87,7 +87,13 @@ class _KubernetesDecoratedOperator(DecoratedOperator,
KubernetesPodOperator):
def _generate_cmds(self) -> list[str]:
script_filename = "/tmp/script.py"
input_filename = "/tmp/script.in"
- output_filename = "/airflow/xcom/return.json"
+
+ if getattr(self, "do_xcom_push", False):
+ output_filename = "/airflow/xcom/return.json"
+ make_xcom_dir_cmd = "mkdir -p /airflow/xcom"
+ else:
+ output_filename = "/dev/null"
+ make_xcom_dir_cmd = ":" # shell no-op
write_local_script_file_cmd = (
f"{_generate_decoded_command(quote(_PYTHON_SCRIPT_ENV),
quote(script_filename))}"
@@ -95,7 +101,6 @@ class _KubernetesDecoratedOperator(DecoratedOperator,
KubernetesPodOperator):
write_local_input_file_cmd = (
f"{_generate_decoded_command(quote(_PYTHON_INPUT_ENV),
quote(input_filename))}"
)
- make_xcom_dir_cmd = "mkdir -p /airflow/xcom"
exec_python_cmd = f"python {script_filename} {input_filename}
{output_filename}"
return [
"bash",
diff --git
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
index 68bc1943799..c6ea4f3e38e 100644
---
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
+++
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
@@ -128,3 +128,50 @@ class
TestKubernetesDecorator(TestKubernetesDecoratorsBase):
# Second container is xcom image
assert containers[1].image == XCOM_IMAGE
assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom"
+
+ @pytest.mark.asyncio
+ @pytest.mark.parametrize(
+ ("do_xcom_push", "expected_output", "expects_mkdir"),
+ [
+ (False, "/dev/null", False),
+ (True, "/airflow/xcom/return.json", True),
+ ],
+ ids=["without_xcom_push", "with_xcom_push"],
+ )
+ def test_generated_command_respects_do_xcom_push(
+ self, do_xcom_push: bool, expected_output: str, expects_mkdir: bool
+ ):
+ with self.dag_maker:
+
+ @task.kubernetes(
+ image="python:3.10-slim-buster",
+ in_cluster=False,
+ cluster_context="default",
+ config_file="/tmp/fake_file",
+ namespace="default",
+ )
+ def f():
+ return {"key": "value"}
+
+ k8s_task = f.override(task_id="my_task_id",
do_xcom_push=do_xcom_push)()
+
+ if do_xcom_push:
+
self.mock_hook.return_value.get_xcom_sidecar_container_image.return_value =
XCOM_IMAGE
+
self.mock_hook.return_value.get_xcom_sidecar_container_resources.return_value =
{
+ "requests": {"cpu": "1m", "memory": "10Mi"},
+ "limits": {"cpu": "1m", "memory": "50Mi"},
+ }
+
+ self.execute_task(k8s_task)
+ containers =
self.mock_create_pod.call_args.kwargs["pod"].spec.containers
+ assert len(containers) == (2 if do_xcom_push else 1)
+
+ generated_command = containers[0].command
+ assert generated_command
+ assert len(generated_command) >= 3
+
+ bash_command = generated_command[-1]
+ assert expected_output in bash_command
+ assert ("/airflow/xcom" in bash_command) == expects_mkdir
+ if not expects_mkdir:
+ assert " && : && " in bash_command