This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 09ce3a5100 K8s Executor: failing the task in case the watcher receives
an event with the reason ProviderFailed (#41186)
09ce3a5100 is described below
commit 09ce3a5100c266369350c85e9f9a0f72ecca9e98
Author: Gopal Dirisala <[email protected]>
AuthorDate: Fri Aug 23 11:44:02 2024 +0530
K8s Executor: failing the task in case the watcher receives an event with
the reason ProviderFailed (#41186)
---
.../cncf/kubernetes/executors/kubernetes_executor_utils.py | 10 ++++++++++
.../cncf/kubernetes/executors/test_kubernetes_executor.py | 7 +++++++
2 files changed, 17 insertions(+)
diff --git
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index fce631b9b7..a6e49adb93 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -220,6 +220,16 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
# However, need to free the executor slot from the current
executor.
self.log.info("Event: pod %s adopted, annotations: %s", pod_name,
annotations_string)
self.watcher_queue.put((pod_name, namespace, ADOPTED, annotations,
resource_version))
+ elif hasattr(pod.status, "reason") and pod.status.reason ==
"ProviderFailed":
+ # Most likely this happens due to Kubernetes setup (virtual
kubelet, virtual nodes, etc.)
+ self.log.error(
+ "Event: %s failed to start with reason ProviderFailed,
annotations: %s",
+ pod_name,
+ annotations_string,
+ )
+ self.watcher_queue.put(
+ (pod_name, namespace, TaskInstanceState.FAILED, annotations,
resource_version)
+ )
elif status == "Pending":
# deletion_timestamp is set by kube server when a graceful
deletion is requested.
# since kube server have received request to delete pod set TI
state failed
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index e062d77ef6..769eb9c980 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1804,6 +1804,13 @@ class TestKubernetesJobWatcher:
self._run()
self.assert_watcher_queue_called_once_with_state(State.FAILED)
+ def test_process_status_provider_failed(self):
+ self.pod.status.reason = "ProviderFailed"
+ self.events.append({"type": "MODIFIED", "object": self.pod})
+
+ self._run()
+ self.assert_watcher_queue_called_once_with_state(State.FAILED)
+
def test_process_status_succeeded(self):
self.pod.status.phase = "Succeeded"
self.events.append({"type": "MODIFIED", "object": self.pod})