This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 3b6a24bba11 Fix XCom directory creation logic in Kubernetes decorator 
(#56545) (#59347)
3b6a24bba11 is described below

commit 3b6a24bba116551665572f4eea3b0cebc44bd901
Author: Aaron Chen <[email protected]>
AuthorDate: Sat Dec 13 23:39:27 2025 +0800

    Fix XCom directory creation logic in Kubernetes decorator (#56545) (#59347)
    
    * Fix XCom directory creation logic in Kubernetes decorator and add tests 
for do_xcom_push behavior (#56545)
    
    * fix CI test error
---
 .../cncf/kubernetes/decorators/kubernetes.py       |  9 ++++-
 .../cncf/kubernetes/decorators/test_kubernetes.py  | 47 ++++++++++++++++++++++
 2 files changed, 54 insertions(+), 2 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
index 6d18732264f..f1a2fa4395e 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/decorators/kubernetes.py
@@ -87,7 +87,13 @@ class _KubernetesDecoratedOperator(DecoratedOperator, 
KubernetesPodOperator):
     def _generate_cmds(self) -> list[str]:
         script_filename = "/tmp/script.py"
         input_filename = "/tmp/script.in"
-        output_filename = "/airflow/xcom/return.json"
+
+        if getattr(self, "do_xcom_push", False):
+            output_filename = "/airflow/xcom/return.json"
+            make_xcom_dir_cmd = "mkdir -p /airflow/xcom"
+        else:
+            output_filename = "/dev/null"
+            make_xcom_dir_cmd = ":"  # shell no-op
 
         write_local_script_file_cmd = (
             f"{_generate_decoded_command(quote(_PYTHON_SCRIPT_ENV), 
quote(script_filename))}"
@@ -95,7 +101,6 @@ class _KubernetesDecoratedOperator(DecoratedOperator, 
KubernetesPodOperator):
         write_local_input_file_cmd = (
             f"{_generate_decoded_command(quote(_PYTHON_INPUT_ENV), 
quote(input_filename))}"
         )
-        make_xcom_dir_cmd = "mkdir -p /airflow/xcom"
         exec_python_cmd = f"python {script_filename} {input_filename} 
{output_filename}"
         return [
             "bash",
diff --git 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
index 68bc1943799..c6ea4f3e38e 100644
--- 
a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
+++ 
b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/decorators/test_kubernetes.py
@@ -128,3 +128,50 @@ class 
TestKubernetesDecorator(TestKubernetesDecoratorsBase):
         # Second container is xcom image
         assert containers[1].image == XCOM_IMAGE
         assert containers[1].volume_mounts[0].mount_path == "/airflow/xcom"
+
+    @pytest.mark.asyncio
+    @pytest.mark.parametrize(
+        ("do_xcom_push", "expected_output", "expects_mkdir"),
+        [
+            (False, "/dev/null", False),
+            (True, "/airflow/xcom/return.json", True),
+        ],
+        ids=["without_xcom_push", "with_xcom_push"],
+    )
+    def test_generated_command_respects_do_xcom_push(
+        self, do_xcom_push: bool, expected_output: str, expects_mkdir: bool
+    ):
+        with self.dag_maker:
+
+            @task.kubernetes(
+                image="python:3.10-slim-buster",
+                in_cluster=False,
+                cluster_context="default",
+                config_file="/tmp/fake_file",
+                namespace="default",
+            )
+            def f():
+                return {"key": "value"}
+
+            k8s_task = f.override(task_id="my_task_id", 
do_xcom_push=do_xcom_push)()
+
+        if do_xcom_push:
+            
self.mock_hook.return_value.get_xcom_sidecar_container_image.return_value = 
XCOM_IMAGE
+            
self.mock_hook.return_value.get_xcom_sidecar_container_resources.return_value = 
{
+                "requests": {"cpu": "1m", "memory": "10Mi"},
+                "limits": {"cpu": "1m", "memory": "50Mi"},
+            }
+
+        self.execute_task(k8s_task)
+        containers = 
self.mock_create_pod.call_args.kwargs["pod"].spec.containers
+        assert len(containers) == (2 if do_xcom_push else 1)
+
+        generated_command = containers[0].command
+        assert generated_command
+        assert len(generated_command) >= 3
+
+        bash_command = generated_command[-1]
+        assert expected_output in bash_command
+        assert ("/airflow/xcom" in bash_command) == expects_mkdir
+        if not expects_mkdir:
+            assert " && : && " in bash_command

Reply via email to