This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0120515f6c Added unit tests and restructred 
`await_xcom_sidecar_container_start` method. (#42504)
0120515f6c is described below

commit 0120515f6c0061711feba4990cfc61db47a5b4f0
Author: harjeevan maan <[email protected]>
AuthorDate: Thu Oct 3 22:02:54 2024 -0400

    Added unit tests and restructred `await_xcom_sidecar_container_start` 
method. (#42504)
    
    * Added unit tests and restructred `await_xcom_sidecar_container_start`
    method.
    
    - The `await_xcom_sidecar_container_start` method in `PodManager` checks if 
the xcom sidecar container has started running before executing `do_xcom_push`.
    - The function logs the status periodically and raises an 
`AirflowException` if the container does not start within the specified timeout.
    - Added two unit tests:
      - `test_await_xcom_sidecar_container_timeout`: Verifies that an 
`AirflowException` is raised if the sidecar container fails to start within the 
timeout.
      - `test_await_xcom_sidecar_container_starts`: Confirms the method 
successfully exits when the sidecar container starts.
    
    * Fixed the assertion test failures
---
 .../providers/cncf/kubernetes/utils/pod_manager.py | 26 +++++++++++++++++-----
 .../cncf/kubernetes/utils/test_pod_manager.py      | 15 +++++++++++++
 2 files changed, 35 insertions(+), 6 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/utils/pod_manager.py 
b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
index 7c283eaccc..cd91dc0928 100644
--- a/airflow/providers/cncf/kubernetes/utils/pod_manager.py
+++ b/airflow/providers/cncf/kubernetes/utils/pod_manager.py
@@ -19,7 +19,6 @@
 from __future__ import annotations
 
 import enum
-import itertools
 import json
 import math
 import time
@@ -721,14 +720,29 @@ class PodManager(LoggingMixin):
         except HTTPError as e:
             raise AirflowException(f"There was an error reading the kubernetes 
API: {e}")
 
-    def await_xcom_sidecar_container_start(self, pod: V1Pod) -> None:
+    def await_xcom_sidecar_container_start(
+        self, pod: V1Pod, timeout: int = 900, log_interval: int = 30
+    ) -> None:
+        """Check if the sidecar container has reached the 'Running' state 
before performing do_xcom_push."""
         self.log.info("Checking if xcom sidecar container is started.")
-        for attempt in itertools.count():
+        start_time = time.time()
+        last_log_time = start_time
+
+        while True:
+            elapsed_time = time.time() - start_time
             if self.container_is_running(pod, 
PodDefaults.SIDECAR_CONTAINER_NAME):
-                self.log.info("The xcom sidecar container is started.")
+                self.log.info("The xcom sidecar container has started.")
                 break
-            if not attempt:
-                self.log.warning("The xcom sidecar container is not yet 
started.")
+            if (time.time() - last_log_time) >= log_interval:
+                self.log.warning(
+                    "Still waiting for the xcom sidecar container to start. 
Elapsed time: %d seconds.",
+                    int(elapsed_time),
+                )
+                last_log_time = time.time()
+            if elapsed_time > timeout:
+                raise AirflowException(
+                    f"Xcom sidecar container did not start within {timeout // 
60} minutes."
+                )
             time.sleep(1)
 
     def extract_xcom(self, pod: V1Pod) -> str:
diff --git a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py 
b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
index 3e4f2d086f..73dac5255d 100644
--- a/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
+++ b/tests/providers/cncf/kubernetes/utils/test_pod_manager.py
@@ -601,6 +601,21 @@ class TestPodManager:
             self.pod_manager.extract_xcom(pod=mock_pod)
         assert mock_exec_xcom_kill.call_count == 1
 
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    def test_await_xcom_sidecar_container_timeout(self, 
mock_container_is_running):
+        mock_pod = MagicMock()
+        mock_container_is_running.return_value = False
+        with pytest.raises(AirflowException):
+            self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod, 
timeout=10, log_interval=5)
+        mock_container_is_running.assert_any_call(mock_pod, 
"airflow-xcom-sidecar")
+
+    
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
+    def test_await_xcom_sidecar_container_starts(self, 
mock_container_is_running):
+        mock_pod = MagicMock()
+        mock_container_is_running.return_value = True
+        self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod)
+        mock_container_is_running.assert_any_call(mock_pod, 
"airflow-xcom-sidecar")
+
 
 def params_for_test_container_is_running():
     """The `container_is_running` method is designed to handle an assortment 
of bad objects

Reply via email to