This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d470674912 Immediately fail the task in case of worker pod having a 
fatal container state (#37670)
d470674912 is described below

commit d4706749128c4ec16890541f97c6e607e8eeb86a
Author: Gopal Dirisala <[email protected]>
AuthorDate: Fri Mar 1 17:34:57 2024 +0530

    Immediately fail the task in case of worker pod having a fatal container 
state (#37670)
    
    * fail the task in case of worker pod having fatal container state
    
    * version number updated
---
 .../executors/kubernetes_executor_utils.py         |  28 +++
 airflow/providers/cncf/kubernetes/kube_config.py   |   6 +
 airflow/providers/cncf/kubernetes/provider.yaml    |  10 ++
 .../executors/test_kubernetes_executor.py          | 189 ++++++++++++++++++++-
 4 files changed, 230 insertions(+), 3 deletions(-)

diff --git 
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py 
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 27bffeb476..db2a4c0a81 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -235,6 +235,34 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
                 self.watcher_queue.put(
                     (pod_name, namespace, TaskInstanceState.FAILED, 
annotations, resource_version)
                 )
+            elif (
+                
self.kube_config.worker_pod_pending_fatal_container_state_reasons
+                and "status" in event["raw_object"]
+            ):
+                self.log.info("Event: %s Pending, annotations: %s", pod_name, 
annotations_string)
+                # Init containers and base container statuses to check.
+                # Skipping the other containers statuses check.
+                container_statuses_to_check = []
+                if "initContainerStatuses" in event["raw_object"]["status"]:
+                    
container_statuses_to_check.extend(event["raw_object"]["status"]["initContainerStatuses"])
+                if "containerStatuses" in event["raw_object"]["status"]:
+                    
container_statuses_to_check.append(event["raw_object"]["status"]["containerStatuses"][0])
+                for container_status in container_statuses_to_check:
+                    container_status_state = container_status["state"]
+                    if "waiting" in container_status_state:
+                        if (
+                            container_status_state["waiting"]["reason"]
+                            in 
self.kube_config.worker_pod_pending_fatal_container_state_reasons
+                        ):
+                            if (
+                                container_status_state["waiting"]["reason"] == 
"ErrImagePull"
+                                and 
container_status_state["waiting"]["message"] == "pull QPS exceeded"
+                            ):
+                                continue
+                            self.watcher_queue.put(
+                                (pod_name, namespace, 
TaskInstanceState.FAILED, annotations, resource_version)
+                            )
+                            break
             else:
                 self.log.debug("Event: %s Pending, annotations: %s", pod_name, 
annotations_string)
         elif status == "Failed":
diff --git a/airflow/providers/cncf/kubernetes/kube_config.py 
b/airflow/providers/cncf/kubernetes/kube_config.py
index 8db861a7f5..7a1de52928 100644
--- a/airflow/providers/cncf/kubernetes/kube_config.py
+++ b/airflow/providers/cncf/kubernetes/kube_config.py
@@ -40,6 +40,12 @@ class KubeConfig:
         self.delete_worker_pods_on_failure = conf.getboolean(
             self.kubernetes_section, "delete_worker_pods_on_failure"
         )
+        self.worker_pod_pending_fatal_container_state_reasons = []
+        if conf.get(self.kubernetes_section, 
"worker_pod_pending_fatal_container_state_reasons", fallback=""):
+            self.worker_pod_pending_fatal_container_state_reasons = conf.get(
+                self.kubernetes_section, 
"worker_pod_pending_fatal_container_state_reasons"
+            ).split(",")
+
         self.worker_pods_creation_batch_size = conf.getint(
             self.kubernetes_section, "worker_pods_creation_batch_size"
         )
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml 
b/airflow/providers/cncf/kubernetes/provider.yaml
index 4300240fd7..2cfbab231b 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -224,6 +224,16 @@ config:
         type: string
         example: ~
         default: "False"
+      worker_pod_pending_fatal_container_state_reasons:
+        description: |
+          If the worker pods are in a pending state due to a fatal container
+          state reasons, then fail the task and delete the worker pod
+          if delete_worker_pods is True and delete_worker_pods_on_failure is 
True.
+        version_added: 8.1.0
+        type: string
+        example: ~
+        default: 
'CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError,
+        InvalidImageName'
       worker_pods_creation_batch_size:
         description: |
           Number of Kubernetes Worker Pod creation calls per scheduler loop.
diff --git 
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py 
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index d00342fd98..23774f3b2c 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1488,6 +1488,14 @@ class TestKubernetesJobWatcher:
             scheduler_job_id="123",
             kube_config=mock.MagicMock(),
         )
+        
self.watcher.kube_config.worker_pod_pending_fatal_container_state_reasons = [
+            "CreateContainerConfigError",
+            "CrashLoopBackOff",
+            "ErrImagePull",
+            "CreateContainerError",
+            "ImageInspectError",
+            "InvalidImageName",
+        ]
         self.kube_client = mock.MagicMock()
         self.core_annotations = {
             "dag_id": "dag",
@@ -1532,11 +1540,186 @@ class TestKubernetesJobWatcher:
             )
         )
 
-    def test_process_status_pending(self):
-        self.events.append({"type": "MODIFIED", "object": self.pod})
+    @pytest.mark.parametrize(
+        "raw_object, is_watcher_queue_called",
+        [
+            pytest.param(
+                {
+                    "status": {
+                        "startTime": "2020-05-12T03:49:57Z",
+                        "containerStatuses": [
+                            {
+                                "name": "base",
+                                "state": {
+                                    "waiting": {
+                                        "reason": "CreateContainerConfigError",
+                                        "message": 'secret "my-secret" not 
found',
+                                    }
+                                },
+                                "lastState": {},
+                                "ready": False,
+                                "restartCount": 0,
+                                "image": "dockerhub.com/apache/airflow:latest",
+                                "imageID": "",
+                            }
+                        ],
+                    }
+                },
+                True,
+                id="CreateContainerConfigError",
+            ),
+            pytest.param(
+                {
+                    "status": {
+                        "startTime": "2020-05-12T03:49:57Z",
+                        "containerStatuses": [
+                            {
+                                "name": "base",
+                                "state": {
+                                    "waiting": {"reason": "ErrImagePull", 
"message": "pull QPS exceeded"}
+                                },
+                                "lastState": {},
+                                "ready": False,
+                                "restartCount": 0,
+                                "image": "dockerhub.com/apache/airflow:latest",
+                                "imageID": "",
+                            }
+                        ],
+                    }
+                },
+                False,
+                id="ErrImagePull Image QPS Exceeded",
+            ),
+            pytest.param(
+                {
+                    "status": {
+                        "startTime": "2020-05-12T03:49:57Z",
+                        "containerStatuses": [
+                            {
+                                "name": "base",
+                                "state": {
+                                    "waiting": {
+                                        "reason": "ErrImagePull",
+                                        "message": "rpc error: code = Unknown 
desc = Error response from daemon: manifest for 
dockerhub.com/apache/airflow:xyz not found: manifest unknown: Requested image 
not found",
+                                    }
+                                },
+                                "lastState": {},
+                                "ready": False,
+                                "restartCount": 0,
+                                "image": "dockerhub.com/apache/airflow:xyz",
+                                "imageID": "",
+                            }
+                        ],
+                    }
+                },
+                True,
+                id="ErrImagePull Image Not Found",
+            ),
+            pytest.param(
+                {
+                    "status": {
+                        "startTime": "2020-05-12T03:49:57Z",
+                        "containerStatuses": [
+                            {
+                                "name": "base",
+                                "state": {
+                                    "waiting": {
+                                        "reason": "CreateContainerError",
+                                        "message": 'Error: Error response from 
daemon: create \invalid\path: "\\invalid\path" includes invalid characters for 
a local volume name, only "[a-zA-Z0-9][a-zA-Z0-9_.-]" are allowed. If you 
intended to pass a host directory, use absolute path',
+                                    }
+                                },
+                                "lastState": {},
+                                "ready": False,
+                                "restartCount": 0,
+                                "image": "dockerhub.com/apache/airflow:latest",
+                                "imageID": "",
+                            }
+                        ],
+                    }
+                },
+                True,
+                id="CreateContainerError",
+            ),
+            pytest.param(
+                {
+                    "status": {
+                        "startTime": "2020-05-12T03:49:57Z",
+                        "containerStatuses": [
+                            {
+                                "name": "base",
+                                "state": {
+                                    "waiting": {
+                                        "reason": "ImageInspectError",
+                                        "message": 'Failed to inspect image 
"dockerhub.com/apache/airflow:latest": rpc error: code = Unknown desc = Error 
response from daemon: readlink /var/lib/docker/overlay2: invalid argument',
+                                    }
+                                },
+                                "lastState": {},
+                                "ready": False,
+                                "restartCount": 0,
+                                "image": "dockerhub.com/apache/airflow:latest",
+                                "imageID": "",
+                            }
+                        ],
+                    }
+                },
+                True,
+                id="ImageInspectError",
+            ),
+            pytest.param(
+                {
+                    "status": {
+                        "startTime": "2020-05-12T03:49:57Z",
+                        "containerStatuses": [
+                            {
+                                "name": "base",
+                                "state": {
+                                    "waiting": {
+                                        "reason": "InvalidImageName",
+                                        "message": 'Failed to apply default 
image tag "dockerhub.com/apache/airflow:latest+07": couldnot parse image 
reference "dockerhub.com/apache/airflow:latest+07": invalid reference format',
+                                    }
+                                },
+                                "lastState": {},
+                                "ready": False,
+                                "restartCount": 0,
+                                "image": 
"dockerhub.com/apache/airflow:latest+07",
+                                "imageID": "",
+                            }
+                        ],
+                    }
+                },
+                True,
+                id="InvalidImageName",
+            ),
+            pytest.param(
+                {
+                    "status": {
+                        "startTime": "2020-05-12T03:49:57Z",
+                        "containerStatuses": [
+                            {
+                                "name": "base",
+                                "state": {"waiting": {"reason": 
"OtherReasons", "message": ""}},
+                                "lastState": {},
+                                "ready": False,
+                                "restartCount": 0,
+                                "image": "dockerhub.com/apache/airflow:latest",
+                                "imageID": "",
+                            }
+                        ],
+                    }
+                },
+                False,
+                id="OtherReasons",
+            ),
+        ],
+    )
+    def test_process_status_pending(self, raw_object, is_watcher_queue_called):
+        self.events.append({"type": "MODIFIED", "object": self.pod, 
"raw_object": raw_object})
 
         self._run()
-        self.watcher.watcher_queue.put.assert_not_called()
+        if is_watcher_queue_called:
+            self.assert_watcher_queue_called_once_with_state(State.FAILED)
+        else:
+            self.watcher.watcher_queue.put.assert_not_called()
 
     def test_process_status_pending_deleted(self):
         self.events.append({"type": "DELETED", "object": self.pod})

Reply via email to