jscheffl commented on code in PR #49867:
URL: https://github.com/apache/airflow/pull/49867#discussion_r2076157770


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -375,30 +378,68 @@ def create_pod(self, pod: V1Pod) -> V1Pod:
         return self.run_pod_async(pod)
 
     def await_pod_start(
-        self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: 
int = 1
+        self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 
120, check_interval: int = 1
     ) -> None:
         """
         Wait for the pod to reach phase other than ``Pending``.
 
         :param pod:
+        :param schedule_timeout: Timeout (in seconds) for pod stay in schedule 
state
+            (if pod is taking to long in schedule state, fails task)
         :param startup_timeout: Timeout (in seconds) for startup of the pod
-            (if pod is pending for too long, fails task)
-        :param startup_check_interval: Interval (in seconds) between checks
+            (if pod is pending for too long after being scheduled, fails task)
+        :param check_interval: Interval (in seconds) between checks
         :return:
         """
+        self.log.info("::group::Waiting until %ss to get the POD 
scheduled...", schedule_timeout)
+        pod_was_scheduled = False
         curr_time = time.time()
         while True:
             remote_pod = self.read_pod(pod)
-            if remote_pod.status.phase != PodPhase.PENDING:
+            pod_status = remote_pod.status
+            if pod_status.phase != PodPhase.PENDING:
+                self.keep_watching_for_events = False
+                self.log.info("::endgroup::")
                 break
-            self.log.warning("Pod not yet started: %s", pod.metadata.name)
-            if time.time() - curr_time >= startup_timeout:
-                msg = (
-                    f"Pod took longer than {startup_timeout} seconds to start. 
"
-                    "Check the pod events in kubernetes to determine why."
-                )
-                raise PodLaunchFailedException(msg)
-            time.sleep(startup_check_interval)
+
+            # Check for timeout
+            pod_conditions: list[V1PodCondition] = pod_status.conditions
+            if pod_conditions and any(
+                (condition.type == "PodScheduled" and condition.status == 
"True")
+                for condition in pod_conditions
+            ):
+                if not pod_was_scheduled:
+                    # POD was initially scheduled update timeout for getting 
POD launched
+                    pod_was_scheduled = True
+                    self.log.info("Waiting %ss to get the POD running...", 
startup_timeout)
+
+                if time.time() - curr_time >= startup_timeout:
+                    self.log.info("::endgroup::")
+                    raise PodLaunchFailedException(
+                        f"Pod took too long to start. More than 
{startup_timeout}s. Check the pod events in kubernetes."
+                    )
+            else:
+                if time.time() - curr_time >= schedule_timeout:
+                    self.log.info("::endgroup::")
+                    raise PodLaunchFailedException(
+                        f"Pod took too long to be scheduled on the cluster, 
giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
+                    )
+
+            # Check for general problems to terminate early - ErrImagePull
+            if pod_status.container_statuses:
+                for container_status_any in pod_status.container_statuses:
+                    container_status: V1ContainerStatus = container_status_any
+                    container_state: V1ContainerState = container_status.state
+                    container_waiting: V1ContainerStateWaiting | None = 
container_state.waiting
+                    if container_waiting:
+                        if container_waiting.reason in ["ErrImagePull", 
"InvalidImageName"]:

Review Comment:
   Do you know any docs about this? I have seen only these two in my 
environment.
   Just checked API specs and K8s client code... there is no enum defined that 
helps :-(
   So I'd suggest to use this and if there are more then it can be extended in 
future?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to