This is an automated email from the ASF dual-hosted git repository.

bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c67a2b0413 Use constant for empty xcom result sentinel (#35451)
c67a2b0413 is described below

commit c67a2b0413994e799ed3f1969d7f0194683cba13
Author: Daniel Standish <[email protected]>
AuthorDate: Sun Nov 5 07:47:54 2023 -0800

    Use constant for empty xcom result sentinel (#35451)
    
    Define things in one place.
---
 airflow/providers/cncf/kubernetes/operators/pod.py     |  4 +++-
 airflow/providers/cncf/kubernetes/utils/pod_manager.py | 13 +++++++++++--
 2 files changed, 14 insertions(+), 3 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 56abd98350..5b58b1bedb 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -55,6 +55,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import 
PodGenerator
 from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
 from airflow.providers.cncf.kubernetes.utils import xcom_sidecar  # type: 
ignore[attr-defined]
 from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+    EMPTY_XCOM_RESULT,
     OnFinishAction,
     PodLaunchFailedException,
     PodManager,
@@ -576,7 +577,7 @@ class KubernetesPodOperator(BaseOperator):
     def extract_xcom(self, pod: k8s.V1Pod):
         """Retrieve xcom value and kill xcom sidecar container."""
         result = self.pod_manager.extract_xcom(pod)
-        if isinstance(result, str) and result.rstrip() == 
"__airflow_xcom_result_empty__":
+        if isinstance(result, str) and result.rstrip() == EMPTY_XCOM_RESULT:
             self.log.info("xcom result file is empty.")
             return None
         else:
@@ -591,6 +592,7 @@ class KubernetesPodOperator(BaseOperator):
             return self.execute_sync(context)
 
     def execute_sync(self, context: Context):
+        result = None
         try:
             self.pod_request_obj = self.build_pod_request_obj(context)
             self.pod = self.get_or_create_pod(  # must set `self.pod` for 
`on_kill`
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index aff4970cd9..2559a13f24 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -50,6 +50,13 @@ if TYPE_CHECKING:
     from kubernetes.client.models.v1_pod import V1Pod
     from urllib3.response import HTTPResponse
 
+EMPTY_XCOM_RESULT = "__airflow_xcom_result_empty__"
+"""
+Sentinel for no xcom result.
+
+:meta private:
+"""
+
 
 class PodLaunchFailedException(AirflowException):
     """When pod launching fails in KubernetesPodOperator."""
@@ -712,9 +719,11 @@ class PodManager(LoggingMixin):
         ) as resp:
             result = self._exec_pod_command(
                 resp,
-                f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then 
cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo 
__airflow_xcom_result_empty__; fi",
+                f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; "
+                f"then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; "
+                f"else echo {EMPTY_XCOM_RESULT}; fi",
             )
-            if result and result.rstrip() != "__airflow_xcom_result_empty__":
+            if result and result.rstrip() != EMPTY_XCOM_RESULT:
                 # Note: result string is parsed to check if its valid json.
                 # This function still returns a string which is converted into 
json in the calling method.
                 json.loads(result)

Reply via email to