This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 09ce3a5100 K8s Executor: failing the task in case the watcher receives 
an event with the reason ProviderFailed (#41186)
09ce3a5100 is described below

commit 09ce3a5100c266369350c85e9f9a0f72ecca9e98
Author: Gopal Dirisala <[email protected]>
AuthorDate: Fri Aug 23 11:44:02 2024 +0530

    K8s Executor: failing the task in case the watcher receives an event with 
the reason ProviderFailed (#41186)
---
 .../cncf/kubernetes/executors/kubernetes_executor_utils.py     | 10 ++++++++++
 .../cncf/kubernetes/executors/test_kubernetes_executor.py      |  7 +++++++
 2 files changed, 17 insertions(+)

diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index fce631b9b7..a6e49adb93 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -220,6 +220,16 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
             # However, need to free the executor slot from the current 
executor.
             self.log.info("Event: pod %s adopted, annotations: %s", pod_name, 
annotations_string)
             self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations, 
resource_version))
+        elif hasattr(pod.status, "reason") and pod.status.reason == 
"ProviderFailed":
+            # Most likely this happens due to Kubernetes setup (virtual 
kubelet, virtual nodes, etc.)
+            self.log.error(
+                "Event: %s failed to start with reason ProviderFailed, 
annotations: %s",
+                pod_name,
+                annotations_string,
+            )
+            self.watcher_queue.put(
+                (pod_name, namespace, TaskInstanceState.FAILED, annotations, 
resource_version)
+            )
         elif status == "Pending":
             # deletion_timestamp is set by kube server when a graceful 
deletion is requested.
             # since kube server have received request to delete pod set TI 
state failed
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index e062d77ef6..769eb9c980 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1804,6 +1804,13 @@ class TestKubernetesJobWatcher:
         self._run()
         self.assert_watcher_queue_called_once_with_state(State.FAILED)
 
+    def test_process_status_provider_failed(self):
+        self.pod.status.reason = "ProviderFailed"
+        self.events.append({"type": "MODIFIED", "object": self.pod})
+
+        self._run()
+        self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
     def test_process_status_succeeded(self):
         self.pod.status.phase = "Succeeded"
         self.events.append({"type": "MODIFIED", "object": self.pod})

Reply via email to