jscheffl commented on code in PR #56875:
URL: https://github.com/apache/airflow/pull/56875#discussion_r2446083077


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -878,12 +878,15 @@ async def get_pod(self, name: str, namespace: str) -> 
V1Pod:
         :param namespace: Name of the pod's namespace.
         """
         async with self.get_conn() as connection:
-            v1_api = async_client.CoreV1Api(connection)
-            pod: V1Pod = await v1_api.read_namespaced_pod(
-                name=name,
-                namespace=namespace,
-            )
-        return pod
+            try:
+                v1_api = async_client.CoreV1Api(connection)
+                pod: V1Pod = await v1_api.read_namespaced_pod(
+                    name=name,
+                    namespace=namespace,
+                )
+                return pod
+            except HTTPError as e:
+                raise AirflowException(f"There was an error reading the 
kubernetes API: {e}")

Review Comment:
   We recently decided in the community that we want to ban the use of 
`AirflowException` as it is very un-specific. Can you create a new K8s provider 
specific exception for this case and prevent usage of `AirflowException`? 
Probably if all is going well then static checks will yell as well.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py:
##########
@@ -287,6 +291,10 @@ def hook(self) -> AsyncKubernetesHook:
             cluster_context=self.cluster_context,
         )
 
+    @cached_property
+    def pod_manager(self) -> AsyncPodManager:
+        return AsyncPodManager(async_hook=self.hook)  # , 
callbacks=self.callbacks)

Review Comment:
   Commented-out code - is this editorial leftover or needed?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py:
##########
@@ -932,6 +935,19 @@ async def read_logs(self, name: str, namespace: str):
                 self.log.exception("There was an error reading the kubernetes 
API.")
                 raise
 
+    async def get_pod_events(self, name: str, namespace: str) -> 
CoreV1EventList:
+        """Get pod's events."""
+        async with self.get_conn() as connection:
+            try:
+                v1_api = async_client.CoreV1Api(connection)
+                events: CoreV1EventList = await v1_api.list_namespaced_event(
+                    field_selector=f"involvedObject.name={name}",
+                    namespace=namespace,
+                )
+                return events
+            except HTTPError as e:
+                raise AirflowException(f"There was an error reading the 
kubernetes API: {e}")

Review Comment:
   +1 as above



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -99,6 +101,92 @@ def check_exception_is_kubernetes_api_unauthorized(exc: 
BaseException):
     return isinstance(exc, ApiException) and exc.status and str(exc.status) == 
"401"
 
 
+async def generic_watch_pod_events(
+    self,
+    pod: V1Pod,
+    check_interval: float = 1,
+    is_async: bool = True,
+) -> None:
+    """Read pod events and writes into log."""
+    num_events = 0
+    while not self.stop_watching_events:
+        events = await self.read_pod_events(pod) if is_async else 
self.read_pod_events(pod)
+        for new_event in events.items[num_events:]:
+            involved_object: V1ObjectReference = new_event.involved_object
+            self.log.info("The Pod has an Event: %s from %s", 
new_event.message, involved_object.field_path)
+        num_events = len(events.items)
+        await asyncio.sleep(check_interval)
+
+
+async def generic_await_pod_start(
+    self,
+    pod,
+    schedule_timeout: int = 120,
+    startup_timeout: int = 120,
+    check_interval: float = 1,
+    is_async: bool = True,
+):
+    """
+    Monitor the startup phase of a Kubernetes pod, waiting for it to leave the 
``Pending`` state.
+
+    This function is shared by both PodManager and AsyncPodManager to provide 
consistent pod startup tracking.
+
+    :param pod: The pod object to monitor.
+    :param schedule_timeout: Maximum time (in seconds) to wait for the pod to 
be scheduled.
+    :param startup_timeout: Maximum time (in seconds) to wait for the pod to 
start running after being scheduled.
+    :param check_interval: Interval (in seconds) between status checks.
+    :param is_async: Set to True if called in an async context; otherwise, 
False.
+    """
+    self.log.info("::group::Waiting until %ss to get the POD scheduled...", 
schedule_timeout)
+    pod_was_scheduled = False
+    start_check_time = time.time()
+    while True:
+        remote_pod = await self.read_pod(pod) if is_async else 
self.read_pod(pod)
+        pod_status = remote_pod.status
+        if pod_status.phase != PodPhase.PENDING:
+            self.stop_watching_events = True

Review Comment:
   `stop_watching_events` belongs to AsyncPodManager, I assume this is not 
working as it is modelled here? This function definition carries a `self`object 
but is not member of a class?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py:
##########
@@ -110,11 +113,12 @@ def __init__(
         self.get_logs = get_logs
         self.startup_timeout = startup_timeout
         self.startup_check_interval = startup_check_interval
+        # New parameter startup_timeout_seconds adds breaking change, to 
handle this as smooth as possible just reuse startup time

Review Comment:
   I understand the comment as a comment in this PR but after the PR is merged, 
the temporal effect is... mis-leading. After release it is not a breaking 
change.
   
   We classify a "breaking change" is something that fundamentally changes the 
API. We did this on the sync interface in the past (as far as I remember), it 
is not just a change in behavior or do you see a breaking API by this?



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -99,6 +101,92 @@ def check_exception_is_kubernetes_api_unauthorized(exc: 
BaseException):
     return isinstance(exc, ApiException) and exc.status and str(exc.status) == 
"401"
 
 
+async def generic_watch_pod_events(
+    self,
+    pod: V1Pod,
+    check_interval: float = 1,
+    is_async: bool = True,
+) -> None:
+    """Read pod events and writes into log."""
+    num_events = 0
+    while not self.stop_watching_events:
+        events = await self.read_pod_events(pod) if is_async else 
self.read_pod_events(pod)
+        for new_event in events.items[num_events:]:
+            involved_object: V1ObjectReference = new_event.involved_object
+            self.log.info("The Pod has an Event: %s from %s", 
new_event.message, involved_object.field_path)
+        num_events = len(events.items)
+        await asyncio.sleep(check_interval)
+
+
+async def generic_await_pod_start(

Review Comment:
   +1 with generic naming



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -99,6 +101,92 @@ def check_exception_is_kubernetes_api_unauthorized(exc: 
BaseException):
     return isinstance(exc, ApiException) and exc.status and str(exc.status) == 
"401"
 
 
+async def generic_watch_pod_events(

Review Comment:
   Can you add some pydocs "why" this is called "generic" and not just 
`watch_pod_events()`? If there is no reason then I'd favor the name w/o generic.



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py:
##########
@@ -700,6 +701,142 @@ def test_await_xcom_sidecar_container_starts(self, 
mock_container_is_running):
         mock_container_is_running.assert_any_call(mock_pod, 
"airflow-xcom-sidecar")
 
 
+class TestAsyncPodManager:
+    def setup_method(self):
+        self.mock_async_hook = mock.AsyncMock()
+        self.async_pod_manager = AsyncPodManager(
+            async_hook=self.mock_async_hook,
+            callbacks=[],
+        )
+
+    @pytest.mark.asyncio
+    async def 
test_start_pod_raises_informative_error_on_scheduled_timeout(self):
+        pod_response = mock.MagicMock()
+        pod_response.status.phase = "Pending"
+        self.mock_async_hook.get_pod.return_value = pod_response
+        expected_msg = "Pod took too long to be scheduled on the cluster, 
giving up. More than 0s. Check the pod events in kubernetes."
+        mock_pod = mock.MagicMock()
+        with pytest.raises(AirflowException, match=expected_msg):
+            await self.async_pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=0,
+                startup_timeout=0,
+            )
+        self.mock_async_hook.get_pod.assert_called()
+
+    @pytest.mark.asyncio
+    async def test_start_pod_raises_informative_error_on_startup_timeout(self):
+        pod_response = mock.MagicMock()
+        pod_response.status.phase = "Pending"
+        condition = mock.MagicMock()
+        condition.type = "PodScheduled"
+        condition.status = "True"
+        pod_response.status.conditions = [condition]
+        self.mock_async_hook.get_pod.return_value = pod_response
+        expected_msg = "Pod took too long to start. More than 0s. Check the 
pod events in kubernetes."
+        mock_pod = mock.MagicMock()
+        with pytest.raises(AirflowException, match=expected_msg):
+            await self.async_pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=0,
+                startup_timeout=0,
+            )
+        self.mock_async_hook.get_pod.assert_called()
+
+    @pytest.mark.asyncio
+    async def test_start_pod_raises_fast_error_on_image_error(self):
+        pod_response = mock.MagicMock()
+        pod_response.status.phase = "Pending"
+        container_status = mock.MagicMock()
+        waiting_state = mock.MagicMock()
+        waiting_state.reason = "ErrImagePull"
+        waiting_state.message = "Test error"
+        container_status.state.waiting = waiting_state
+        pod_response.status.container_statuses = [container_status]
+        self.mock_async_hook.get_pod.return_value = pod_response
+        expected_msg = f"Pod docker image cannot be pulled, unable to start: 
{waiting_state.reason}\n{waiting_state.message}"
+        mock_pod = mock.MagicMock()
+        with pytest.raises(AirflowException, match=expected_msg):
+            await self.async_pod_manager.await_pod_start(
+                pod=mock_pod,
+                schedule_timeout=60,
+                startup_timeout=60,
+            )
+        self.mock_async_hook.get_pod.assert_called()
+
+    @pytest.mark.asyncio
+    @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock)
+    async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, 
caplog):

Review Comment:
   `caplog`here as well as above.



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -99,6 +101,92 @@ def check_exception_is_kubernetes_api_unauthorized(exc: 
BaseException):
     return isinstance(exc, ApiException) and exc.status and str(exc.status) == 
"401"
 
 
+async def generic_watch_pod_events(
+    self,
+    pod: V1Pod,
+    check_interval: float = 1,
+    is_async: bool = True,
+) -> None:
+    """Read pod events and writes into log."""
+    num_events = 0
+    while not self.stop_watching_events:
+        events = await self.read_pod_events(pod) if is_async else 
self.read_pod_events(pod)

Review Comment:
   Why are you reading from the same function in different ways depending on 
async or now? Would it not be the same?



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -319,14 +335,48 @@ def 
test_define_container_state_should_execute_successfully(
 
         assert expected_state == trigger.define_container_state(pod)
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def test_run_loop_read_events_during_start(self, mock_hook, 
mock_method, trigger, caplog):

Review Comment:
   We aim to ban `caplog` from pytests as we had seen multiple times problems 
with this. Can you please use other means than grepping logs, e.g. mocking the 
logger to grab the messages or better use other means than logging for 
validation?



##########
providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py:
##########
@@ -319,14 +335,48 @@ def 
test_define_container_state_should_execute_successfully(
 
         assert expected_state == trigger.define_container_state(pod)
 
+    @pytest.mark.asyncio
+    @mock.patch(f"{TRIGGER_PATH}.define_container_state")
+    @mock.patch(f"{TRIGGER_PATH}.hook")
+    async def test_run_loop_read_events_during_start(self, mock_hook, 
mock_method, trigger, caplog):

Review Comment:
   (Same for other usages below as well)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to