This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e2d069f  Set task state to failed when pod is DELETED while running 
(#18095)
e2d069f is described below

commit e2d069f3c78a45ca29bc21b25a9e96b4e36a5d86
Author: lindsable <47788186+lindsa...@users.noreply.github.com>
AuthorDate: Sat Sep 11 16:18:32 2021 -0400

    Set task state to failed when pod is DELETED while running (#18095)
    
    There is a bug in the Kubernetes Job Watcher that occurs when a node with a 
running worker pod is removed from the cluster. If the worker pod doesn't 
complete before the node is removed, it is orphaned and forced deleted by the 
garbage collector. This is communicated by the API with a status='Running' but 
an event with type='DELETED'
    
    Because in the if statement the Job Watcher doesn't check the event type, 
the last information we get from the pod is that is it running. The running 
scheduler never gets any information about the pod and shows it as stuck in a 
queued state. This situation is fixed when the scheduler/executor restarts and 
this function is run.
---
 airflow/executors/kubernetes_executor.py    | 6 +++++-
 tests/executors/test_kubernetes_executor.py | 7 +++++++
 2 files changed, 12 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 993787c..5e748da 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -206,7 +206,11 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
             self.log.info('Event: %s Succeeded', pod_id)
             self.watcher_queue.put((pod_id, namespace, None, annotations, 
resource_version))
         elif status == 'Running':
-            self.log.info('Event: %s is Running', pod_id)
+            if event['type'] == 'DELETED':
+                self.log.info('Event: Pod %s deleted before it could 
complete', pod_id)
+                self.watcher_queue.put((pod_id, namespace, State.FAILED, 
annotations, resource_version))
+            else:
+                self.log.info('Event: %s is Running', pod_id)
         else:
             self.log.warning(
                 'Event: Invalid state: %s on pod: %s in namespace %s with 
annotations: %s with '
diff --git a/tests/executors/test_kubernetes_executor.py 
b/tests/executors/test_kubernetes_executor.py
index 025b956..2fa0b8a 100644
--- a/tests/executors/test_kubernetes_executor.py
+++ b/tests/executors/test_kubernetes_executor.py
@@ -738,6 +738,13 @@ class TestKubernetesJobWatcher(unittest.TestCase):
         self._run()
         self.assert_watcher_queue_called_once_with_state(None)
 
+    def test_process_status_running_deleted(self):
+        self.pod.status.phase = "Running"
+        self.events.append({"type": 'DELETED', "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
     def test_process_status_running(self):
         self.pod.status.phase = "Running"
         self.events.append({"type": 'MODIFIED', "object": self.pod})

Reply via email to