jedcunningham commented on a change in pull request #15336:
URL: https://github.com/apache/airflow/pull/15336#discussion_r612504746
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -187,25 +190,35 @@ def process_status(
self,
pod_id: str,
namespace: str,
- status: str,
+ status: V1PodStatus,
Review comment:
Sorry, I lead you slightly astray earlier... This will lead to less
imports:
```suggestion
status: k8s.V1PodStatus,
```
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -507,3 +506,37 @@ def test_process_status_catchall(self):
self._run()
self.watcher.watcher_queue.put.assert_not_called()
+
+ def test_container_status_of_waiting_with_errimagepull_fails_pod(self):
+ self.pod.status.phase = "Pending"
+ self.pod.status.container_statuses = [
+ k8s.V1ContainerStatus(
+ container_id=None,
+ image="apache/airflow:2.0.1-python3.8",
+ image_id="",
+ name="base",
+ ready="false",
+ restart_count=0,
+
state=k8s.V1ContainerState(waiting=k8s.V1ContainerStateWaiting(reason='ErrImagePull')),
+ )
+ ]
+ self.events.append({"type": 'MODIFIED', "object": self.pod})
+ self._run()
+ self.watcher.watcher_queue.put.assert_called()
Review comment:
```suggestion
self.assert_watcher_queue_called_once_with_state(State.FAILED)
```
##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -187,25 +190,35 @@ def process_status(
self,
pod_id: str,
namespace: str,
- status: str,
+ status: V1PodStatus,
annotations: Dict[str, str],
resource_version: str,
event: Any,
) -> None:
"""Process status response"""
- if status == 'Pending':
- if event['type'] == 'DELETED':
+ pod_status = status.phase
+ if pod_status == 'Pending':
+ # Check container statuses
+ container_statuses = status.container_statuses
+ init_container_statuses = status.init_container_statuses
+ if container_statuses and
self._container_image_pull_err(container_statuses):
+ self.log.info('Event: Failed to start pod %s, a container has
an ErrImagePull', pod_id)
+ self.watcher_queue.put((pod_id, namespace, State.FAILED,
annotations, resource_version))
Review comment:
Yeah, which is why #15263 deletes the pod. I think that is all we can
do, unfortunately.
That said, should we be failing these after the first pull failure? It is
certainly something that could self-resolve and we haven't actually started the
task yet 🤷♂️. I'm thinking a timeout is a more general catch-all for pending
issues.
##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -507,3 +506,37 @@ def test_process_status_catchall(self):
self._run()
self.watcher.watcher_queue.put.assert_not_called()
+
+ def test_container_status_of_waiting_with_errimagepull_fails_pod(self):
+ self.pod.status.phase = "Pending"
+ self.pod.status.container_statuses = [
+ k8s.V1ContainerStatus(
+ container_id=None,
+ image="apache/airflow:2.0.1-python3.8",
+ image_id="",
+ name="base",
+ ready="false",
+ restart_count=0,
+
state=k8s.V1ContainerState(waiting=k8s.V1ContainerStateWaiting(reason='ErrImagePull')),
+ )
+ ]
+ self.events.append({"type": 'MODIFIED', "object": self.pod})
+ self._run()
+ self.watcher.watcher_queue.put.assert_called()
+
+ def
test_init_container_status_of_waiting_with_errimagepull_fails_pod(self):
+ self.pod.status.phase = "Pending"
+ self.pod.status.init_container_statuses = [
+ k8s.V1ContainerStatus(
+ container_id=None,
+ image="apache/airflow:2.0.1-python3.8",
+ image_id="",
+ name="base",
+ ready="false",
+ restart_count=0,
+
state=k8s.V1ContainerState(waiting=k8s.V1ContainerStateWaiting(reason='ErrImagePull')),
+ )
+ ]
+ self.events.append({"type": 'MODIFIED', "object": self.pod})
+ self._run()
+ self.watcher.watcher_queue.put.assert_called()
Review comment:
```suggestion
self.assert_watcher_queue_called_once_with_state(State.FAILED)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]