This is an automated email from the ASF dual-hosted git repository.
husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d470674912 Immediately fail the task in case of worker pod having a
fatal container state (#37670)
d470674912 is described below
commit d4706749128c4ec16890541f97c6e607e8eeb86a
Author: Gopal Dirisala <[email protected]>
AuthorDate: Fri Mar 1 17:34:57 2024 +0530
Immediately fail the task in case of worker pod having a fatal container
state (#37670)
* fail the task in case of worker pod having fatal container state
* version number updated
---
.../executors/kubernetes_executor_utils.py | 28 +++
airflow/providers/cncf/kubernetes/kube_config.py | 6 +
airflow/providers/cncf/kubernetes/provider.yaml | 10 ++
.../executors/test_kubernetes_executor.py | 189 ++++++++++++++++++++-
4 files changed, 230 insertions(+), 3 deletions(-)
diff --git
a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
index 27bffeb476..db2a4c0a81 100644
--- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
+++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py
@@ -235,6 +235,34 @@ class KubernetesJobWatcher(multiprocessing.Process,
LoggingMixin):
self.watcher_queue.put(
(pod_name, namespace, TaskInstanceState.FAILED,
annotations, resource_version)
)
+ elif (
+
self.kube_config.worker_pod_pending_fatal_container_state_reasons
+ and "status" in event["raw_object"]
+ ):
+ self.log.info("Event: %s Pending, annotations: %s", pod_name,
annotations_string)
+ # Init containers and base container statuses to check.
+ # Skipping the other containers statuses check.
+ container_statuses_to_check = []
+ if "initContainerStatuses" in event["raw_object"]["status"]:
+
container_statuses_to_check.extend(event["raw_object"]["status"]["initContainerStatuses"])
+ if "containerStatuses" in event["raw_object"]["status"]:
+
container_statuses_to_check.append(event["raw_object"]["status"]["containerStatuses"][0])
+ for container_status in container_statuses_to_check:
+ container_status_state = container_status["state"]
+ if "waiting" in container_status_state:
+ if (
+ container_status_state["waiting"]["reason"]
+ in
self.kube_config.worker_pod_pending_fatal_container_state_reasons
+ ):
+ if (
+ container_status_state["waiting"]["reason"] ==
"ErrImagePull"
+ and
container_status_state["waiting"]["message"] == "pull QPS exceeded"
+ ):
+ continue
+ self.watcher_queue.put(
+ (pod_name, namespace,
TaskInstanceState.FAILED, annotations, resource_version)
+ )
+ break
else:
self.log.debug("Event: %s Pending, annotations: %s", pod_name,
annotations_string)
elif status == "Failed":
diff --git a/airflow/providers/cncf/kubernetes/kube_config.py
b/airflow/providers/cncf/kubernetes/kube_config.py
index 8db861a7f5..7a1de52928 100644
--- a/airflow/providers/cncf/kubernetes/kube_config.py
+++ b/airflow/providers/cncf/kubernetes/kube_config.py
@@ -40,6 +40,12 @@ class KubeConfig:
self.delete_worker_pods_on_failure = conf.getboolean(
self.kubernetes_section, "delete_worker_pods_on_failure"
)
+ self.worker_pod_pending_fatal_container_state_reasons = []
+ if conf.get(self.kubernetes_section,
"worker_pod_pending_fatal_container_state_reasons", fallback=""):
+ self.worker_pod_pending_fatal_container_state_reasons = conf.get(
+ self.kubernetes_section,
"worker_pod_pending_fatal_container_state_reasons"
+ ).split(",")
+
self.worker_pods_creation_batch_size = conf.getint(
self.kubernetes_section, "worker_pods_creation_batch_size"
)
diff --git a/airflow/providers/cncf/kubernetes/provider.yaml
b/airflow/providers/cncf/kubernetes/provider.yaml
index 4300240fd7..2cfbab231b 100644
--- a/airflow/providers/cncf/kubernetes/provider.yaml
+++ b/airflow/providers/cncf/kubernetes/provider.yaml
@@ -224,6 +224,16 @@ config:
type: string
example: ~
default: "False"
+ worker_pod_pending_fatal_container_state_reasons:
+ description: |
+ If the worker pods are in a pending state due to a fatal container
+ state reasons, then fail the task and delete the worker pod
+ if delete_worker_pods is True and delete_worker_pods_on_failure is
True.
+ version_added: 8.1.0
+ type: string
+ example: ~
+ default:
'CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError,
+ InvalidImageName'
worker_pods_creation_batch_size:
description: |
Number of Kubernetes Worker Pod creation calls per scheduler loop.
diff --git
a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
index d00342fd98..23774f3b2c 100644
--- a/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
+++ b/tests/providers/cncf/kubernetes/executors/test_kubernetes_executor.py
@@ -1488,6 +1488,14 @@ class TestKubernetesJobWatcher:
scheduler_job_id="123",
kube_config=mock.MagicMock(),
)
+
self.watcher.kube_config.worker_pod_pending_fatal_container_state_reasons = [
+ "CreateContainerConfigError",
+ "CrashLoopBackOff",
+ "ErrImagePull",
+ "CreateContainerError",
+ "ImageInspectError",
+ "InvalidImageName",
+ ]
self.kube_client = mock.MagicMock()
self.core_annotations = {
"dag_id": "dag",
@@ -1532,11 +1540,186 @@ class TestKubernetesJobWatcher:
)
)
- def test_process_status_pending(self):
- self.events.append({"type": "MODIFIED", "object": self.pod})
+ @pytest.mark.parametrize(
+ "raw_object, is_watcher_queue_called",
+ [
+ pytest.param(
+ {
+ "status": {
+ "startTime": "2020-05-12T03:49:57Z",
+ "containerStatuses": [
+ {
+ "name": "base",
+ "state": {
+ "waiting": {
+ "reason": "CreateContainerConfigError",
+ "message": 'secret "my-secret" not
found',
+ }
+ },
+ "lastState": {},
+ "ready": False,
+ "restartCount": 0,
+ "image": "dockerhub.com/apache/airflow:latest",
+ "imageID": "",
+ }
+ ],
+ }
+ },
+ True,
+ id="CreateContainerConfigError",
+ ),
+ pytest.param(
+ {
+ "status": {
+ "startTime": "2020-05-12T03:49:57Z",
+ "containerStatuses": [
+ {
+ "name": "base",
+ "state": {
+ "waiting": {"reason": "ErrImagePull",
"message": "pull QPS exceeded"}
+ },
+ "lastState": {},
+ "ready": False,
+ "restartCount": 0,
+ "image": "dockerhub.com/apache/airflow:latest",
+ "imageID": "",
+ }
+ ],
+ }
+ },
+ False,
+ id="ErrImagePull Image QPS Exceeded",
+ ),
+ pytest.param(
+ {
+ "status": {
+ "startTime": "2020-05-12T03:49:57Z",
+ "containerStatuses": [
+ {
+ "name": "base",
+ "state": {
+ "waiting": {
+ "reason": "ErrImagePull",
+ "message": "rpc error: code = Unknown
desc = Error response from daemon: manifest for
dockerhub.com/apache/airflow:xyz not found: manifest unknown: Requested image
not found",
+ }
+ },
+ "lastState": {},
+ "ready": False,
+ "restartCount": 0,
+ "image": "dockerhub.com/apache/airflow:xyz",
+ "imageID": "",
+ }
+ ],
+ }
+ },
+ True,
+ id="ErrImagePull Image Not Found",
+ ),
+ pytest.param(
+ {
+ "status": {
+ "startTime": "2020-05-12T03:49:57Z",
+ "containerStatuses": [
+ {
+ "name": "base",
+ "state": {
+ "waiting": {
+ "reason": "CreateContainerError",
+ "message": 'Error: Error response from
daemon: create \invalid\path: "\\invalid\path" includes invalid characters for
a local volume name, only "[a-zA-Z0-9][a-zA-Z0-9_.-]" are allowed. If you
intended to pass a host directory, use absolute path',
+ }
+ },
+ "lastState": {},
+ "ready": False,
+ "restartCount": 0,
+ "image": "dockerhub.com/apache/airflow:latest",
+ "imageID": "",
+ }
+ ],
+ }
+ },
+ True,
+ id="CreateContainerError",
+ ),
+ pytest.param(
+ {
+ "status": {
+ "startTime": "2020-05-12T03:49:57Z",
+ "containerStatuses": [
+ {
+ "name": "base",
+ "state": {
+ "waiting": {
+ "reason": "ImageInspectError",
+ "message": 'Failed to inspect image
"dockerhub.com/apache/airflow:latest": rpc error: code = Unknown desc = Error
response from daemon: readlink /var/lib/docker/overlay2: invalid argument',
+ }
+ },
+ "lastState": {},
+ "ready": False,
+ "restartCount": 0,
+ "image": "dockerhub.com/apache/airflow:latest",
+ "imageID": "",
+ }
+ ],
+ }
+ },
+ True,
+ id="ImageInspectError",
+ ),
+ pytest.param(
+ {
+ "status": {
+ "startTime": "2020-05-12T03:49:57Z",
+ "containerStatuses": [
+ {
+ "name": "base",
+ "state": {
+ "waiting": {
+ "reason": "InvalidImageName",
+ "message": 'Failed to apply default
image tag "dockerhub.com/apache/airflow:latest+07": couldnot parse image
reference "dockerhub.com/apache/airflow:latest+07": invalid reference format',
+ }
+ },
+ "lastState": {},
+ "ready": False,
+ "restartCount": 0,
+ "image":
"dockerhub.com/apache/airflow:latest+07",
+ "imageID": "",
+ }
+ ],
+ }
+ },
+ True,
+ id="InvalidImageName",
+ ),
+ pytest.param(
+ {
+ "status": {
+ "startTime": "2020-05-12T03:49:57Z",
+ "containerStatuses": [
+ {
+ "name": "base",
+ "state": {"waiting": {"reason":
"OtherReasons", "message": ""}},
+ "lastState": {},
+ "ready": False,
+ "restartCount": 0,
+ "image": "dockerhub.com/apache/airflow:latest",
+ "imageID": "",
+ }
+ ],
+ }
+ },
+ False,
+ id="OtherReasons",
+ ),
+ ],
+ )
+ def test_process_status_pending(self, raw_object, is_watcher_queue_called):
+ self.events.append({"type": "MODIFIED", "object": self.pod,
"raw_object": raw_object})
self._run()
- self.watcher.watcher_queue.put.assert_not_called()
+ if is_watcher_queue_called:
+ self.assert_watcher_queue_called_once_with_state(State.FAILED)
+ else:
+ self.watcher.watcher_queue.put.assert_not_called()
def test_process_status_pending_deleted(self):
self.events.append({"type": "DELETED", "object": self.pod})