jscheffl commented on code in PR #56875:
URL: https://github.com/apache/airflow/pull/56875#discussion_r2449427519
##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -99,6 +101,92 @@ def check_exception_is_kubernetes_api_unauthorized(exc:
BaseException):
return isinstance(exc, ApiException) and exc.status and str(exc.status) ==
"401"
+async def generic_watch_pod_events(
+ self,
+ pod: V1Pod,
+ check_interval: float = 1,
+ is_async: bool = True,
+) -> None:
+ """Read pod events and writes into log."""
+ num_events = 0
+ while not self.stop_watching_events:
+ events = await self.read_pod_events(pod) if is_async else
self.read_pod_events(pod)
+ for new_event in events.items[num_events:]:
+ involved_object: V1ObjectReference = new_event.involved_object
+ self.log.info("The Pod has an Event: %s from %s",
new_event.message, involved_object.field_path)
+ num_events = len(events.items)
+ await asyncio.sleep(check_interval)
+
+
+async def generic_await_pod_start(
+ self,
+ pod,
+ schedule_timeout: int = 120,
+ startup_timeout: int = 120,
+ check_interval: float = 1,
+ is_async: bool = True,
+):
+ """
+ Monitor the startup phase of a Kubernetes pod, waiting for it to leave the
``Pending`` state.
+
+ This function is shared by both PodManager and AsyncPodManager to provide
consistent pod startup tracking.
+
+ :param pod: The pod object to monitor.
+ :param schedule_timeout: Maximum time (in seconds) to wait for the pod to
be scheduled.
+ :param startup_timeout: Maximum time (in seconds) to wait for the pod to
start running after being scheduled.
+ :param check_interval: Interval (in seconds) between status checks.
+ :param is_async: Set to True if called in an async context; otherwise,
False.
+ """
+ self.log.info("::group::Waiting until %ss to get the POD scheduled...",
schedule_timeout)
+ pod_was_scheduled = False
+ start_check_time = time.time()
+ while True:
+ remote_pod = await self.read_pod(pod) if is_async else
self.read_pod(pod)
+ pod_status = remote_pod.status
+ if pod_status.phase != PodPhase.PENDING:
+ self.stop_watching_events = True
Review Comment:
Aaaah, yeah with the comment from before together this makes sense now.
Thanks for the re-work!
Otherwise besides using a function an alternative modelling could also have
been to implement the common code via a MixIn. But now it is clear to me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]