This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c67a2b0413 Use constant for empty xcom result sentinel (#35451)
c67a2b0413 is described below
commit c67a2b0413994e799ed3f1969d7f0194683cba13
Author: Daniel Standish <[email protected]>
AuthorDate: Sun Nov 5 07:47:54 2023 -0800
Use constant for empty xcom result sentinel (#35451)
Define things in one place.
---
airflow/providers/cncf/kubernetes/operators/pod.py | 4 +++-
airflow/providers/cncf/kubernetes/utils/pod_manager.py | 13 +++++++++++--
2 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 56abd98350..5b58b1bedb 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -55,6 +55,7 @@ from airflow.providers.cncf.kubernetes.pod_generator import
PodGenerator
from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger
from airflow.providers.cncf.kubernetes.utils import xcom_sidecar # type:
ignore[attr-defined]
from airflow.providers.cncf.kubernetes.utils.pod_manager import (
+ EMPTY_XCOM_RESULT,
OnFinishAction,
PodLaunchFailedException,
PodManager,
@@ -576,7 +577,7 @@ class KubernetesPodOperator(BaseOperator):
def extract_xcom(self, pod: k8s.V1Pod):
"""Retrieve xcom value and kill xcom sidecar container."""
result = self.pod_manager.extract_xcom(pod)
- if isinstance(result, str) and result.rstrip() ==
"__airflow_xcom_result_empty__":
+ if isinstance(result, str) and result.rstrip() == EMPTY_XCOM_RESULT:
self.log.info("xcom result file is empty.")
return None
else:
@@ -591,6 +592,7 @@ class KubernetesPodOperator(BaseOperator):
return self.execute_sync(context)
def execute_sync(self, context: Context):
+ result = None
try:
self.pod_request_obj = self.build_pod_request_obj(context)
self.pod = self.get_or_create_pod( # must set `self.pod` for
`on_kill`
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index aff4970cd9..2559a13f24 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -50,6 +50,13 @@ if TYPE_CHECKING:
from kubernetes.client.models.v1_pod import V1Pod
from urllib3.response import HTTPResponse
+EMPTY_XCOM_RESULT = "__airflow_xcom_result_empty__"
+"""
+Sentinel for no xcom result.
+
+:meta private:
+"""
+
class PodLaunchFailedException(AirflowException):
"""When pod launching fails in KubernetesPodOperator."""
@@ -712,9 +719,11 @@ class PodManager(LoggingMixin):
) as resp:
result = self._exec_pod_command(
resp,
- f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then
cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo
__airflow_xcom_result_empty__; fi",
+ f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; "
+ f"then cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; "
+ f"else echo {EMPTY_XCOM_RESULT}; fi",
)
- if result and result.rstrip() != "__airflow_xcom_result_empty__":
+ if result and result.rstrip() != EMPTY_XCOM_RESULT:
# Note: result string is parsed to check if its valid json.
# This function still returns a string which is converted into
json in the calling method.
json.loads(result)