jedcunningham commented on a change in pull request #15336:
URL: https://github.com/apache/airflow/pull/15336#discussion_r611981896



##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -218,6 +239,34 @@ def process_status(
                 resource_version,
             )
 
+    def process_container_statuses(
+        self,
+        pod_id: str,
+        statuses: List[Any],
+        namespace: str,
+        annotations: Dict[str, str],
+        resource_version: str,
+    ):
+        """Monitor pod container statuses"""
+        for container_status in statuses:
+            terminated = container_status.state.terminated
+            waiting = container_status.state.waiting
+            if terminated:
+                self.log.debug(
+                    "A container in the pod %s has terminated, reason: %s, 
message: %s",
+                    pod_id,
+                    terminated.reason,
+                    terminated.message,
+                )
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))

Review comment:
       I don't think we should be adding more than once to `watcher_queue`, 
right? It might be better to leave the queue handling to `process_status` and 
just return a bool, less to cart around too then.  Maybe something like this:
   
   ```
   def _has_terminated_containers(self, status: V1PodStatus) -> bool:
   ```

##########
File path: airflow/executors/kubernetes_executor.py
##########
@@ -187,25 +188,45 @@ def process_status(
         self,
         pod_id: str,
         namespace: str,
-        status: str,
+        status: Any,

Review comment:
       ```suggestion
           status: k8s.V1PodStatus,
   ```

##########
File path: tests/executors/test_kubernetes_executor.py
##########
@@ -507,3 +506,113 @@ def test_process_status_catchall(self):
 
         self._run()
         self.watcher.watcher_queue.put.assert_not_called()
+
+    def test_container_status_of_terminating_fails_pod(self):
+        self.pod.status.phase = "Pending"
+        self.pod.status.container_statuses = [
+            k8s.V1ContainerStatus(
+                container_id=None,
+                image="apache/airflow:2.0.1-python3.8",
+                image_id="",
+                name="base",
+                ready="false",
+                restart_count=0,
+                state=k8s.V1ContainerState(
+                    terminated=k8s.V1ContainerStateTerminated(
+                        reason="Terminating", exit_code=1

Review comment:
       Have you seen, or can you recreate a `phase=Pending` and 
`state.terminated` pod? I don't see how it is possible to have both.
   
   I've tried a few scenarios with both init containers and sidecars and every 
case has resulted in the watcher marking it as failed (though maybe not 
immediately, because `phase=Running`) - however the TI still gets marked as 
success.
   
   Said another way, I think there are bugs around here, but I don't think 
looking at stuff in `phase=Pending` will help?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to