jedcunningham commented on a change in pull request #15336:
URL: https://github.com/apache/airflow/pull/15336#discussion_r612504746



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -187,25 +190,35 @@ def process_status(
         self,
         pod_id: str,
         namespace: str,
-        status: str,
+        status: V1PodStatus,

Review comment:
       Sorry, I lead you slightly astray earlier... This will lead to less 
imports:
   
   ```suggestion
           status: k8s.V1PodStatus,
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -507,3 +506,37 @@ def test_process_status_catchall(self):
 
         self._run()
         self.watcher.watcher_queue.put.assert_not_called()
+
+    def test_container_status_of_waiting_with_errimagepull_fails_pod(self):
+        self.pod.status.phase = "Pending"
+        self.pod.status.container_statuses = [
+            k8s.V1ContainerStatus(
+                container_id=None,
+                image="apache/airflow:2.0.1-python3.8",
+                image_id="",
+                name="base",
+                ready="false",
+                restart_count=0,
+                
state=k8s.V1ContainerState(waiting=k8s.V1ContainerStateWaiting(reason='ErrImagePull')),
+            )
+        ]
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+        self._run()
+        self.watcher.watcher_queue.put.assert_called()

Review comment:
       ```suggestion
           self.assert_watcher_queue_called_once_with_state(State.FAILED)
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -187,25 +190,35 @@ def process_status(
         self,
         pod_id: str,
         namespace: str,
-        status: str,
+        status: V1PodStatus,
         annotations: Dict[str, str],
         resource_version: str,
         event: Any,
     ) -> None:
         """Process status response"""
-        if status == 'Pending':
-            if event['type'] == 'DELETED':
+        pod_status = status.phase
+        if pod_status == 'Pending':
+            # Check container statuses
+            container_statuses = status.container_statuses
+            init_container_statuses = status.init_container_statuses
+            if container_statuses and 
self._container_image_pull_err(container_statuses):
+                self.log.info('Event: Failed to start pod %s, a container has 
an ErrImagePull', pod_id)
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))

Review comment:
       Yeah, which is why #15263 deletes the pod. I think that is all we can 
do, unfortunately.
   
   That said, should we be failing these after the first pull failure? It is 
certainly something that could self-resolve and we haven't actually started the 
task yet 🤷‍♂️. I'm thinking a timeout is a more general catch-all for pending 
issues.

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -507,3 +506,37 @@ def test_process_status_catchall(self):
 
         self._run()
         self.watcher.watcher_queue.put.assert_not_called()
+
+    def test_container_status_of_waiting_with_errimagepull_fails_pod(self):
+        self.pod.status.phase = "Pending"
+        self.pod.status.container_statuses = [
+            k8s.V1ContainerStatus(
+                container_id=None,
+                image="apache/airflow:2.0.1-python3.8",
+                image_id="",
+                name="base",
+                ready="false",
+                restart_count=0,
+                
state=k8s.V1ContainerState(waiting=k8s.V1ContainerStateWaiting(reason='ErrImagePull')),
+            )
+        ]
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+        self._run()
+        self.watcher.watcher_queue.put.assert_called()
+
+    def 
test_init_container_status_of_waiting_with_errimagepull_fails_pod(self):
+        self.pod.status.phase = "Pending"
+        self.pod.status.init_container_statuses = [
+            k8s.V1ContainerStatus(
+                container_id=None,
+                image="apache/airflow:2.0.1-python3.8",
+                image_id="",
+                name="base",
+                ready="false",
+                restart_count=0,
+                
state=k8s.V1ContainerState(waiting=k8s.V1ContainerStateWaiting(reason='ErrImagePull')),
+            )
+        ]
+        self.events.append({"type": 'MODIFIED', "object": self.pod})
+        self._run()
+        self.watcher.watcher_queue.put.assert_called()

Review comment:
       ```suggestion
           self.assert_watcher_queue_called_once_with_state(State.FAILED)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to