o-nikolas commented on code in PR #30727: URL: https://github.com/apache/airflow/pull/30727#discussion_r1183007666
########## airflow/executors/kubernetes_executor_utils.py: ########## @@ -0,0 +1,466 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +import multiprocessing +import time +from queue import Empty, Queue +from typing import TYPE_CHECKING, Any + +from kubernetes import client, watch +from kubernetes.client import Configuration, models as k8s +from kubernetes.client.rest import ApiException +from urllib3.exceptions import ReadTimeoutError + +from airflow.exceptions import AirflowException +from airflow.kubernetes.kube_client import get_kube_client +from airflow.kubernetes.kubernetes_helper_functions import annotations_to_key, create_pod_id +from airflow.kubernetes.pod_generator import PodGenerator +from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.state import State + +if TYPE_CHECKING: + from airflow.executors.kubernetes_executor_types import ( + KubernetesJobType, + KubernetesResultsType, + KubernetesWatchType, + ) + + +from airflow.executors.kubernetes_executor_types import ALL_NAMESPACES, POD_EXECUTOR_DONE_KEY + + +class ResourceVersion: + """Singleton for tracking resourceVersion from Kubernetes.""" + + _instance: ResourceVersion | None = None + resource_version: dict[str, str] = {} + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + +class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): + """Watches for Kubernetes jobs.""" + + def __init__( + self, + namespace: str, + watcher_queue: Queue[KubernetesWatchType], + resource_version: str | None, + scheduler_job_id: str, + kube_config: Configuration, + ): + super().__init__() + self.namespace = namespace + self.scheduler_job_id = scheduler_job_id + self.watcher_queue = watcher_queue + self.resource_version = resource_version + self.kube_config = kube_config + + def run(self) -> None: + """Performs watching.""" + if TYPE_CHECKING: + assert self.scheduler_job_id + + kube_client: client.CoreV1Api = get_kube_client() + while True: + try: + self.resource_version = self._run( + kube_client, self.resource_version, self.scheduler_job_id, self.kube_config + ) + except ReadTimeoutError: + self.log.warning( + "There was a timeout error accessing the Kube API. Retrying request.", exc_info=True + ) + time.sleep(1) + except Exception: + self.log.exception("Unknown error in KubernetesJobWatcher. Failing") + self.resource_version = "0" + ResourceVersion().resource_version[self.namespace] = "0" + raise + else: + self.log.warning( + "Watch died gracefully, starting back up with: last resource_version: %s", + self.resource_version, + ) + + def _pod_events(self, kube_client: client.CoreV1Api, query_kwargs: dict): + watcher = watch.Watch() + try: + if self.namespace == ALL_NAMESPACES: + return watcher.stream(kube_client.list_pod_for_all_namespaces, **query_kwargs) + else: + return watcher.stream(kube_client.list_namespaced_pod, self.namespace, **query_kwargs) + except ApiException as e: + if e.status == 410: # Resource version is too old + if self.namespace == ALL_NAMESPACES: + pods = kube_client.list_pod_for_all_namespaces(watch=False) + else: + pods = kube_client.list_namespaced_pod(namespace=self.namespace, watch=False) + resource_version = pods.metadata.resource_version + query_kwargs["resource_version"] = resource_version + return self._pod_events(kube_client=kube_client, query_kwargs=query_kwargs) + else: + raise + + def _run( + self, + kube_client: client.CoreV1Api, + resource_version: str | None, + scheduler_job_id: str, + kube_config: Any, + ) -> str | None: + self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version) + + kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"} + if resource_version: + kwargs["resource_version"] = resource_version + if kube_config.kube_client_request_args: + for key, value in kube_config.kube_client_request_args.items(): + kwargs[key] = value + + last_resource_version: str | None = None + + for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs): + task = event["object"] + self.log.debug("Event: %s had an event of type %s", task.metadata.name, event["type"]) + if event["type"] == "ERROR": + return self.process_error(event) + annotations = task.metadata.annotations + task_instance_related_annotations = { + "dag_id": annotations["dag_id"], + "task_id": annotations["task_id"], + "execution_date": annotations.get("execution_date"), + "run_id": annotations.get("run_id"), + "try_number": annotations["try_number"], + } + map_index = annotations.get("map_index") + if map_index is not None: + task_instance_related_annotations["map_index"] = map_index + + self.process_status( + pod_name=task.metadata.name, + namespace=task.metadata.namespace, + status=task.status.phase, + annotations=task_instance_related_annotations, + resource_version=task.metadata.resource_version, + event=event, + ) + last_resource_version = task.metadata.resource_version + + return last_resource_version + + def process_error(self, event: Any) -> str: + """Process error response.""" + self.log.error("Encountered Error response from k8s list namespaced pod stream => %s", event) + raw_object = event["raw_object"] + if raw_object["code"] == 410: + self.log.info( + "Kubernetes resource version is too old, must reset to 0 => %s", (raw_object["message"],) + ) + # Return resource version 0 + return "0" + raise AirflowException( + f"Kubernetes failure for {raw_object['reason']} with code {raw_object['code']} and message: " + f"{raw_object['message']}" + ) + + def process_status( + self, + pod_name: str, + namespace: str, + status: str, + annotations: dict[str, str], + resource_version: str, + event: Any, + ) -> None: + """Process status response.""" + if status == "Pending": + if event["type"] == "DELETED": + self.log.info("Event: Failed to start pod %s", pod_name) + self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version)) + else: + self.log.debug("Event: %s Pending", pod_name) + elif status == "Failed": + self.log.error("Event: %s Failed", pod_name) + self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version)) + elif status == "Succeeded": + # We get multiple events once the pod hits a terminal state, and we only want to + # send it along to the scheduler once. + # If our event type is DELETED, we have the POD_EXECUTOR_DONE_KEY, or the pod has + # a deletion timestamp, we've already seen the initial Succeeded event and sent it + # along to the scheduler. + pod = event["object"] + if ( + event["type"] == "DELETED" + or POD_EXECUTOR_DONE_KEY in pod.metadata.labels + or pod.metadata.deletion_timestamp + ): + self.log.info( + "Skipping event for Succeeded pod %s - event for this pod already sent to executor", + pod_name, + ) + return + self.log.info("Event: %s Succeeded", pod_name) + self.watcher_queue.put((pod_name, namespace, None, annotations, resource_version)) + elif status == "Running": + if event["type"] == "DELETED": + self.log.info("Event: Pod %s deleted before it could complete", pod_name) + self.watcher_queue.put((pod_name, namespace, State.FAILED, annotations, resource_version)) + else: + self.log.info("Event: %s is Running", pod_name) + else: + self.log.warning( + "Event: Invalid state: %s on pod: %s in namespace %s with annotations: %s with " + "resource_version: %s", + status, + pod_name, + namespace, + annotations, + resource_version, + ) + + +class AirflowKubernetesScheduler(LoggingMixin): + """Airflow Scheduler for Kubernetes.""" + + def __init__( + self, + kube_config: Any, + result_queue: Queue[KubernetesResultsType], + kube_client: client.CoreV1Api, + scheduler_job_id: str, + ): + super().__init__() + self.log.debug("Creating Kubernetes executor") + self.kube_config = kube_config + self.result_queue = result_queue + self.namespace = self.kube_config.kube_namespace + self.log.debug("Kubernetes using namespace %s", self.namespace) + self.kube_client = kube_client + self._manager = multiprocessing.Manager() + self.watcher_queue = self._manager.Queue() + self.scheduler_job_id = scheduler_job_id + self.kube_watchers = self._make_kube_watchers() + + def run_pod_async(self, pod: k8s.V1Pod, **kwargs): + """Runs POD asynchronously.""" + sanitized_pod = self.kube_client.api_client.sanitize_for_serialization(pod) + json_pod = json.dumps(sanitized_pod, indent=2) + + self.log.debug("Pod Creation Request: \n%s", json_pod) + try: + resp = self.kube_client.create_namespaced_pod( + body=sanitized_pod, namespace=pod.metadata.namespace, **kwargs + ) + self.log.debug("Pod Creation Response: %s", resp) + except Exception as e: + self.log.exception("Exception when attempting to create Namespaced Pod: %s", json_pod) + raise e + return resp + + def _make_kube_watcher(self, namespace) -> KubernetesJobWatcher: + resource_version = ResourceVersion().resource_version.get(namespace, "0") + watcher = KubernetesJobWatcher( + watcher_queue=self.watcher_queue, + namespace=namespace, + resource_version=resource_version, + scheduler_job_id=self.scheduler_job_id, + kube_config=self.kube_config, + ) + watcher.start() + return watcher + + def _make_kube_watchers(self) -> dict[str, KubernetesJobWatcher]: + watchers = {} + if self.kube_config.multi_namespace_mode: + namespaces_to_watch = ( + self.kube_config.multi_namespace_mode_namespace_list + if self.kube_config.multi_namespace_mode_namespace_list + else [ALL_NAMESPACES] + ) + else: + namespaces_to_watch = [self.kube_config.kube_namespace] + + for namespace in namespaces_to_watch: + watchers[namespace] = self._make_kube_watcher(namespace) + return watchers + + def _health_check_kube_watchers(self): + for namespace, kube_watcher in self.kube_watchers.items(): + if kube_watcher.is_alive(): + self.log.debug("KubeJobWatcher for namespace %s alive, continuing", namespace) + else: + self.log.error( + ( + "Error while health checking kube watcher process for namespace %s. " + "Process died for unknown reasons" + ), + namespace, + ) + ResourceVersion().resource_version[namespace] = "0" + self.kube_watchers[namespace] = self._make_kube_watcher(namespace) + + def run_next(self, next_job: KubernetesJobType) -> None: + """Receives the next job to run, builds the pod, and creates it.""" + key, command, kube_executor_config, pod_template_file = next_job + + dag_id, task_id, run_id, try_number, map_index = key + + if command[0:3] != ["airflow", "tasks", "run"]: + raise ValueError('The command must start with ["airflow", "tasks", "run"].') + + base_worker_pod = get_base_pod_from_template(pod_template_file, self.kube_config) + + if not base_worker_pod: + raise AirflowException( + f"could not find a valid worker template yaml at {self.kube_config.pod_template_file}" + ) + + from airflow.kubernetes.pod_generator import PodGenerator Review Comment: Ah yes, this must have been carry-over from before I decided to create a new module. Good spot, should be harmless but I'll remove it :+1: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
