nevcohen commented on code in PR #49867:
URL: https://github.com/apache/airflow/pull/49867#discussion_r2075990488


##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -375,30 +378,68 @@ def create_pod(self, pod: V1Pod) -> V1Pod:
         return self.run_pod_async(pod)
 
     def await_pod_start(
-        self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: 
int = 1
+        self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 
120, check_interval: int = 1
     ) -> None:
         """
         Wait for the pod to reach phase other than ``Pending``.
 
         :param pod:
+        :param schedule_timeout: Timeout (in seconds) for pod stay in schedule 
state
+            (if pod is taking to long in schedule state, fails task)
         :param startup_timeout: Timeout (in seconds) for startup of the pod
-            (if pod is pending for too long, fails task)
-        :param startup_check_interval: Interval (in seconds) between checks
+            (if pod is pending for too long after being scheduled, fails task)
+        :param check_interval: Interval (in seconds) between checks
         :return:
         """
+        self.log.info("::group::Waiting until %ss to get the POD 
scheduled...", schedule_timeout)
+        pod_was_scheduled = False
         curr_time = time.time()
         while True:
             remote_pod = self.read_pod(pod)
-            if remote_pod.status.phase != PodPhase.PENDING:
+            pod_status = remote_pod.status
+            if pod_status.phase != PodPhase.PENDING:
+                self.keep_watching_for_events = False
+                self.log.info("::endgroup::")
                 break
-            self.log.warning("Pod not yet started: %s", pod.metadata.name)
-            if time.time() - curr_time >= startup_timeout:
-                msg = (
-                    f"Pod took longer than {startup_timeout} seconds to start. 
"
-                    "Check the pod events in kubernetes to determine why."
-                )
-                raise PodLaunchFailedException(msg)
-            time.sleep(startup_check_interval)
+
+            # Check for timeout
+            pod_conditions: list[V1PodCondition] = pod_status.conditions
+            if pod_conditions and any(
+                (condition.type == "PodScheduled" and condition.status == 
"True")
+                for condition in pod_conditions
+            ):
+                if not pod_was_scheduled:
+                    # POD was initially scheduled update timeout for getting 
POD launched
+                    pod_was_scheduled = True
+                    self.log.info("Waiting %ss to get the POD running...", 
startup_timeout)
+
+                if time.time() - curr_time >= startup_timeout:
+                    self.log.info("::endgroup::")
+                    raise PodLaunchFailedException(
+                        f"Pod took too long to start. More than 
{startup_timeout}s. Check the pod events in kubernetes."
+                    )
+            else:
+                if time.time() - curr_time >= schedule_timeout:
+                    self.log.info("::endgroup::")
+                    raise PodLaunchFailedException(
+                        f"Pod took too long to be scheduled on the cluster, 
giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
+                    )
+
+            # Check for general problems to terminate early - ErrImagePull
+            if pod_status.container_statuses:
+                for container_status_any in pod_status.container_statuses:
+                    container_status: V1ContainerStatus = container_status_any

Review Comment:
   You can do this instead:
   
   ```py
   container_status: V1ContainerStatus
   for container_status in pod_status.container_statuses:
       container_state: V1ContainerState = container_status.state
       pass
   ```



##########
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py:
##########
@@ -375,30 +378,68 @@ def create_pod(self, pod: V1Pod) -> V1Pod:
         return self.run_pod_async(pod)
 
     def await_pod_start(
-        self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: 
int = 1
+        self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 
120, check_interval: int = 1
     ) -> None:
         """
         Wait for the pod to reach phase other than ``Pending``.
 
         :param pod:
+        :param schedule_timeout: Timeout (in seconds) for pod stay in schedule 
state
+            (if pod is taking to long in schedule state, fails task)
         :param startup_timeout: Timeout (in seconds) for startup of the pod
-            (if pod is pending for too long, fails task)
-        :param startup_check_interval: Interval (in seconds) between checks
+            (if pod is pending for too long after being scheduled, fails task)
+        :param check_interval: Interval (in seconds) between checks
         :return:
         """
+        self.log.info("::group::Waiting until %ss to get the POD 
scheduled...", schedule_timeout)
+        pod_was_scheduled = False
         curr_time = time.time()
         while True:
             remote_pod = self.read_pod(pod)
-            if remote_pod.status.phase != PodPhase.PENDING:
+            pod_status = remote_pod.status
+            if pod_status.phase != PodPhase.PENDING:
+                self.keep_watching_for_events = False
+                self.log.info("::endgroup::")
                 break
-            self.log.warning("Pod not yet started: %s", pod.metadata.name)
-            if time.time() - curr_time >= startup_timeout:
-                msg = (
-                    f"Pod took longer than {startup_timeout} seconds to start. 
"
-                    "Check the pod events in kubernetes to determine why."
-                )
-                raise PodLaunchFailedException(msg)
-            time.sleep(startup_check_interval)
+
+            # Check for timeout
+            pod_conditions: list[V1PodCondition] = pod_status.conditions
+            if pod_conditions and any(
+                (condition.type == "PodScheduled" and condition.status == 
"True")
+                for condition in pod_conditions
+            ):
+                if not pod_was_scheduled:
+                    # POD was initially scheduled update timeout for getting 
POD launched
+                    pod_was_scheduled = True
+                    self.log.info("Waiting %ss to get the POD running...", 
startup_timeout)
+
+                if time.time() - curr_time >= startup_timeout:
+                    self.log.info("::endgroup::")
+                    raise PodLaunchFailedException(
+                        f"Pod took too long to start. More than 
{startup_timeout}s. Check the pod events in kubernetes."
+                    )
+            else:
+                if time.time() - curr_time >= schedule_timeout:
+                    self.log.info("::endgroup::")
+                    raise PodLaunchFailedException(
+                        f"Pod took too long to be scheduled on the cluster, 
giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
+                    )
+
+            # Check for general problems to terminate early - ErrImagePull
+            if pod_status.container_statuses:
+                for container_status_any in pod_status.container_statuses:
+                    container_status: V1ContainerStatus = container_status_any
+                    container_state: V1ContainerState = container_status.state
+                    container_waiting: V1ContainerStateWaiting | None = 
container_state.waiting
+                    if container_waiting:
+                        if container_waiting.reason in ["ErrImagePull", 
"InvalidImageName"]:

Review Comment:
   Are these values can differ between different Kubernetes versions that 
Airflow supports?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to