This is an automated email from the ASF dual-hosted git repository.
eladkal pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new df4c8837d0 Fix KubernetesPodOperator validate xcom json and add
retries (#32113)
df4c8837d0 is described below
commit df4c8837d022e66921bc0cf33f3249b235de6fdd
Author: Ashwin Agate <[email protected]>
AuthorDate: Sat Jul 1 02:43:48 2023 -0400
Fix KubernetesPodOperator validate xcom json and add retries (#32113)
* Fix KubernetesPodOperator validate xcom json and add retries
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 42 ++++++++++++++++++-
.../cncf/kubernetes/utils/test_pod_manager.py | 48 ++++++++++++++++++++++
2 files changed, 89 insertions(+), 1 deletion(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 71b5e17127..2251f1d438 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -545,6 +545,19 @@ class PodManager(LoggingMixin):
def extract_xcom(self, pod: V1Pod) -> str:
"""Retrieves XCom value and kills xcom sidecar container."""
+ try:
+ result = self.extract_xcom_json(pod)
+ return result
+ finally:
+ self.extract_xcom_kill(pod)
+
+ @tenacity.retry(
+ stop=tenacity.stop_after_attempt(5),
+ wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
+ reraise=True,
+ )
+ def extract_xcom_json(self, pod: V1Pod) -> str:
+ """Retrieves XCom value and also checks if xcom json is valid."""
with closing(
kubernetes_stream(
self._client.connect_get_namespaced_pod_exec,
@@ -563,11 +576,38 @@ class PodManager(LoggingMixin):
resp,
f"if [ -s {PodDefaults.XCOM_MOUNT_PATH}/return.json ]; then
cat {PodDefaults.XCOM_MOUNT_PATH}/return.json; else echo
__airflow_xcom_result_empty__; fi", # noqa
)
- self._exec_pod_command(resp, "kill -s SIGINT 1")
+ if result and result.rstrip() != "__airflow_xcom_result_empty__":
+ # Note: result string is parsed to check if its valid json.
+ # This function still returns a string which is converted into
json in the calling method.
+ json.loads(result)
+
if result is None:
raise AirflowException(f"Failed to extract xcom from pod:
{pod.metadata.name}")
return result
+ @tenacity.retry(
+ stop=tenacity.stop_after_attempt(5),
+ wait=tenacity.wait_exponential(multiplier=1, min=4, max=10),
+ reraise=True,
+ )
+ def extract_xcom_kill(self, pod: V1Pod):
+ """Kills xcom sidecar container."""
+ with closing(
+ kubernetes_stream(
+ self._client.connect_get_namespaced_pod_exec,
+ pod.metadata.name,
+ pod.metadata.namespace,
+ container=PodDefaults.SIDECAR_CONTAINER_NAME,
+ command=["/bin/sh"],
+ stdin=True,
+ stdout=True,
+ stderr=True,
+ tty=False,
+ _preload_content=False,
+ )
+ ) as resp:
+ self._exec_pod_command(resp, "kill -s SIGINT 1")
+
def _exec_pod_command(self, resp, command: str) -> str | None:
res = None
if resp.is_open():
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index a55be38a5e..8f28d33dfd 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -18,6 +18,7 @@ from __future__ import annotations
import logging
from datetime import datetime
+from json.decoder import JSONDecodeError
from unittest import mock
from unittest.mock import MagicMock
@@ -370,6 +371,53 @@ class TestPodManager:
pod_info.status.container_statuses = [container_status]
assert container_is_terminated(pod_info, "base") ==
expected_is_terminated
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
+ def test_extract_xcom_success(self, mock_exec_xcom_kill,
mock_exec_pod_command, mock_kubernetes_stream):
+ """test when valid json is retrieved from xcom sidecar container."""
+ xcom_json = """{"a": "true"}"""
+ mock_pod = MagicMock()
+ mock_exec_pod_command.return_value = xcom_json
+ ret = self.pod_manager.extract_xcom(pod=mock_pod)
+ assert ret == xcom_json
+ assert mock_exec_xcom_kill.call_count == 1
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
+ def test_extract_xcom_failure(self, mock_exec_xcom_kill,
mock_exec_pod_command, mock_kubernetes_stream):
+ """test when invalid json is retrieved from xcom sidecar container."""
+ with pytest.raises(JSONDecodeError):
+ xcom_json = """{"a": "tru"""
+ mock_pod = MagicMock()
+ mock_exec_pod_command.return_value = xcom_json
+ self.pod_manager.extract_xcom(pod=mock_pod)
+ assert mock_exec_xcom_kill.call_count == 1
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
+ def test_extract_xcom_empty(self, mock_exec_xcom_kill,
mock_exec_pod_command, mock_kubernetes_stream):
+ """test when __airflow_xcom_result_empty__ is retrieved from xcom
sidecar container."""
+ mock_pod = MagicMock()
+ xcom_result = "__airflow_xcom_result_empty__"
+ mock_exec_pod_command.return_value = xcom_result
+ ret = self.pod_manager.extract_xcom(pod=mock_pod)
+ assert ret == xcom_result
+ assert mock_exec_xcom_kill.call_count == 1
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager._exec_pod_command")
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill")
+ def test_extract_xcom_none(self, mock_exec_xcom_kill,
mock_exec_pod_command, mock_kubernetes_stream):
+ """test when None is retrieved from xcom sidecar container."""
+ with pytest.raises(AirflowException):
+ mock_pod = MagicMock()
+ mock_exec_pod_command.return_value = None
+ self.pod_manager.extract_xcom(pod=mock_pod)
+ assert mock_exec_xcom_kill.call_count == 1
+
def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment
of bad objects