This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 0120515f6c Added unit tests and restructred
`await_xcom_sidecar_container_start` method. (#42504)
0120515f6c is described below
commit 0120515f6c0061711feba4990cfc61db47a5b4f0
Author: harjeevan maan <[email protected]>
AuthorDate: Thu Oct 3 22:02:54 2024 -0400
Added unit tests and restructred `await_xcom_sidecar_container_start`
method. (#42504)
* Added unit tests and restructred `await_xcom_sidecar_container_start`
method.
- The `await_xcom_sidecar_container_start` method in `PodManager` checks if
the xcom sidecar container has started running before executing `do_xcom_push`.
- The function logs the status periodically and raises an
`AirflowException` if the container does not start within the specified timeout.
- Added two unit tests:
- `test_await_xcom_sidecar_container_timeout`: Verifies that an
`AirflowException` is raised if the sidecar container fails to start within the
timeout.
- `test_await_xcom_sidecar_container_starts`: Confirms the method
successfully exits when the sidecar container starts.
* Fixed the assertion test failures
---
.../providers/cncf/kubernetes/utils/pod_manager.py | 26 +++++++++++++++++-----
.../cncf/kubernetes/utils/test_pod_manager.py | 15 +++++++++++++
2 files changed, 35 insertions(+), 6 deletions(-)
diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 7c283eaccc..cd91dc0928 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -19,7 +19,6 @@
from __future__ import annotations
import enum
-import itertools
import json
import math
import time
@@ -721,14 +720,29 @@ class PodManager(LoggingMixin):
except HTTPError as e:
raise AirflowException(f"There was an error reading the kubernetes
API: {e}")
- def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:
+ def await_xcom_sidecar_container_start(
+ self, pod: V1Pod, timeout: int = 900, log_interval: int = 30
+ ) -> None:
+ """Check if the sidecar container has reached the 'Running' state
before performing do_xcom_push."""
self.log.info("Checking if xcom sidecar container is started.")
- for attempt in itertools.count():
+ start_time = time.time()
+ last_log_time = start_time
+
+ while True:
+ elapsed_time = time.time() - start_time
if self.container_is_running(pod,
PodDefaults.SIDECAR_CONTAINER_NAME):
- self.log.info("The xcom sidecar container is started.")
+ self.log.info("The xcom sidecar container has started.")
break
- if not attempt:
- self.log.warning("The xcom sidecar container is not yet
started.")
+ if (time.time() - last_log_time) >= log_interval:
+ self.log.warning(
+ "Still waiting for the xcom sidecar container to start.
Elapsed time: %d seconds.",
+ int(elapsed_time),
+ )
+ last_log_time = time.time()
+ if elapsed_time > timeout:
+ raise AirflowException(
+ f"Xcom sidecar container did not start within {timeout //
60} minutes."
+ )
time.sleep(1)
def extract_xcom(self, pod: V1Pod) -> str:
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 3e4f2d086f..73dac5255d 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -601,6 +601,21 @@ class TestPodManager:
self.pod_manager.extract_xcom(pod=mock_pod)
assert mock_exec_xcom_kill.call_count == 1
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+ def test_await_xcom_sidecar_container_timeout(self,
mock_container_is_running):
+ mock_pod = MagicMock()
+ mock_container_is_running.return_value = False
+ with pytest.raises(AirflowException):
+ self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod,
timeout=10, log_interval=5)
+ mock_container_is_running.assert_any_call(mock_pod,
"airflow-xcom-sidecar")
+
+
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+ def test_await_xcom_sidecar_container_starts(self,
mock_container_is_running):
+ mock_pod = MagicMock()
+ mock_container_is_running.return_value = True
+ self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod)
+ mock_container_is_running.assert_any_call(mock_pod,
"airflow-xcom-sidecar")
+
def params_for_test_container_is_running():
"""The `container_is_running` method is designed to handle an assortment
of bad objects