aptenodytes-forsteri opened a new issue, #39625: URL: https://github.com/apache/airflow/issues/39625
### Apache Airflow Provider(s) cncf-kubernetes ### Versions of Apache Airflow Providers _No response_ ### Apache Airflow version airflow-2.7.3 ### Operating System linux ### Deployment Google Cloud Composer ### Deployment details _No response_ ### What happened I created a dag with a KubernetesPodOperator which uses the annotations to mount GCS buckets using the Google Cloud Storage FUSE Container Storage Interface (CSI) Plugin. Due to what I believe to be another bug, https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver/issues/257, the GCS FUSE sidecar would sometimes stay running. From log messages, I determined the operator was stuck in an infinite loop here: https://github.com/apache/airflow/blob/029cbaec174b73370e7c4ef2d7ec76e7be333400/airflow/providers/cncf/kubernetes/utils/pod_manager.py#L623 This appears to be a somewhat known issue, as the above function seems to have a special case for the istio sidecar. The same treatment should hold for *all* sidecars. ### What you think should happen instead It should not be possible for a "rogue" sidecar container to cause the KubernetesPodOperator to end up in an infinite loop. This behavior ends up hogging resources on a cluster and eventually clogs up the whole cluster with zombie pods. One possible fix would be an optional timeout for how long to wait for the pod to do its work. Another possible fix would be to generalize the treatment for the istio sidecar for all types of sidecars. ### How to reproduce 1. Set up a Google cloud composer environment 2. Create a dag that mounts two buckets ("in" and "out" using the GCS FUSE csi driver) ```python Import json import time from datetime import datetime, timedelta from functools import cached_property from airflow.decorators import dag, task from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from airflow.providers.cncf.kubernetes.utils.pod_manager import (PodManager, PodPhase, container_is_completed) from airflow.utils.dates import days_ago from postgres import Postgres from kubernetes import client from kubernetes.client import models as k8s from kubernetes.client.models.v1_pod import V1Pod # Volume mounts for the 'in' and 'out' GCS bucket. VOLUMES = [ client.V1Volume( name="gcs-fuse-in", csi=client.V1CSIVolumeSource( driver="gcsfuse.csi.storage.gke.io", volume_attributes={ "bucketName": "in-data", "mountOptions": "implicit-dirs", "fileCacheCapacity": "0" }, ), ) ] VOLUME_MOUNTS = [client.V1VolumeMount(name="gcs-fuse-in", mount_path="/data")] out_bucket = "out-data" if out_bucket != "": VOLUMES.append( client.V1Volume( name="gcs-fuse-out", csi=client.V1CSIVolumeSource( driver="gcsfuse.csi.storage.gke.io", volume_attributes={ "bucketName": out_bucket, "mountOptions": "implicit-dirs", "fileCacheCapacity": "0" }))) VOLUME_MOUNTS.append( client.V1VolumeMount(name="gcs-fuse-out", mount_path="/out")) @dag( dag_id="example_dag", description="Attempts to reproduce hanging pod issue.", default_args={ 'owner': 'cloud_engineering', 'email_on_failure': False, 'retries': 1, 'retry_delay': timedelta(seconds=10), }, start_date=START_DATE, end_date=END_DATE, schedule_interval=timedelta(seconds=30), catchup=True, ) def dag(): log_ids = fetch_new_log_ids() container_entry_point_commands = generate_entry_point_commands(log_ids) KubernetesPodOperator.partial( name="process_log_example_dag", namespace="composer-user-workloads", config_file="/home/airflow/composer_kube_config", kubernetes_conn_id="kubernetes_default", task_id="process_log", image="us-central1-docker.pkg.dev/project/images/example-image", get_logs=True, log_events_on_failure=True, do_xcom_push=False, volumes=VOLUMES, volume_mounts=VOLUME_MOUNTS, # GCS Fuse CSI driver relies on pod annotations to configure itself and the container sidecar it runs in. annotations={ "gke-gcsfuse/volumes": "true", "gke-gcsfuse/ephemeral-storage-limit": "1Gi", "gke-gcsfuse/cpu-request": "500m", "gke-gcsfuse/memory-request": "1Gi", "gke-gcsfuse/ephemeral-storage-request": "1Gi", }, container_resources=k8s.V1ResourceRequirements( limits={ 'memory': "3Gi", 'cpu': "1", 'ephemeral-storage': "0Gi" }, requests={ 'memory': "3Gi", 'cpu': "1", 'ephemeral-storage': "0Gi" }, ), ).expand(cmds=container_entry_point_commands) dag() ``` The container runs a simple python script that writes 100 files to /out ``` def main(): for i in range(0,100): with open(f"/out/{i}.txt", "w", encoding="utf8") as f: f.write(f"{i}") raise Exception("oops") if __name__ == "__main__": main() ``` ### Anything else I can work around the issue with: ```python class MyPodManager(PodManager): def __init__(self, **kwargs): super().__init__(**kwargs) def await_pod_completion(self, pod: V1Pod, istio_enabled: bool = False, container_name: str = "base") -> V1Pod: while True and time.time() - start < 7200: remote_pod = self.read_pod(pod) if remote_pod.status.phase in PodPhase.terminal_states: break if istio_enabled and container_is_completed(remote_pod, container_name): break if container_is_completed(remote_pod, container_name): # Always break if the container is completed, even if sidecar containers continue to stay up # and keep the pod up. self.log.info("Pod %s should terminate now", pod.metadata.name) break self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod class MyKubernetesPodOperator(KubernetesPodOperator): def __init__(self, **kwargs): super().__init__(**kwargs) @cached_property def pod_manager(self) -> PodManager: return MyPodManager(kube_client=self.client, callbacks=self.callbacks, progress_callback=self._progress_callback) ``` ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
