aptenodytes-forsteri opened a new issue, #39625:
URL: https://github.com/apache/airflow/issues/39625

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Apache Airflow version
   
   airflow-2.7.3
   
   ### Operating System
   
   linux
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   I created a dag with a KubernetesPodOperator which uses the annotations to 
mount GCS buckets using the Google Cloud Storage FUSE Container Storage 
Interface (CSI) Plugin.
   
   Due to what I believe to be another bug, 
https://github.com/GoogleCloudPlatform/gcs-fuse-csi-driver/issues/257, the GCS 
FUSE sidecar would sometimes stay running.
   
   From log messages, I determined the operator was stuck in an infinite loop 
here: 
https://github.com/apache/airflow/blob/029cbaec174b73370e7c4ef2d7ec76e7be333400/airflow/providers/cncf/kubernetes/utils/pod_manager.py#L623
   
   This appears to be a somewhat known issue, as the above function seems to 
have a special case for the istio sidecar. The same treatment should hold for 
*all* sidecars.
   
   
   ### What you think should happen instead
   
   It should not be possible for a "rogue" sidecar container to cause the 
KubernetesPodOperator to end up in an infinite loop. This behavior ends up 
hogging resources on a cluster and eventually clogs up the whole cluster with 
zombie pods.
   
   One possible fix would be an optional timeout for how long to wait for the 
pod to do its work.
   
   Another possible fix would be to generalize the treatment for the istio 
sidecar for all types of sidecars.
   
   ### How to reproduce
   
   1. Set up a Google cloud composer environment
   2. Create a dag that mounts two buckets ("in" and "out" using the GCS FUSE 
csi driver)
   
   ```python
   Import json
   import time
   from datetime import datetime, timedelta
   from functools import cached_property
   
   from airflow.decorators import dag, task
   from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import 
KubernetesPodOperator
   from airflow.providers.cncf.kubernetes.utils.pod_manager import (PodManager, 
PodPhase,
                                                                   
container_is_completed)
   from airflow.utils.dates import days_ago
   from postgres import Postgres
   
   from kubernetes import client
   from kubernetes.client import models as k8s
   from kubernetes.client.models.v1_pod import V1Pod
   
   
   
   # Volume mounts for the 'in' and 'out' GCS bucket.
   VOLUMES = [
       client.V1Volume(
           name="gcs-fuse-in",
           csi=client.V1CSIVolumeSource(
               driver="gcsfuse.csi.storage.gke.io",
               volume_attributes={
                   "bucketName": "in-data",
                   "mountOptions": "implicit-dirs",
                   "fileCacheCapacity": "0"
               },
           ),
       )
   ]
   VOLUME_MOUNTS = [client.V1VolumeMount(name="gcs-fuse-in", 
mount_path="/data")]
   
   out_bucket = "out-data"
   if out_bucket != "":
       VOLUMES.append(
           client.V1Volume(
               name="gcs-fuse-out", csi=client.V1CSIVolumeSource(
                   driver="gcsfuse.csi.storage.gke.io", volume_attributes={
                       "bucketName": out_bucket,
                       "mountOptions": "implicit-dirs",
                       "fileCacheCapacity": "0"
                   })))
   
       VOLUME_MOUNTS.append(
           client.V1VolumeMount(name="gcs-fuse-out", mount_path="/out"))
   
   
   @dag(
       dag_id="example_dag",
       description="Attempts to reproduce hanging pod issue.",
       default_args={
           'owner': 'cloud_engineering',
           'email_on_failure': False, 
           'retries': 1,
           'retry_delay': timedelta(seconds=10),
       },
       start_date=START_DATE,
       end_date=END_DATE,
       schedule_interval=timedelta(seconds=30),
       catchup=True,
   )
   def dag():
       log_ids = fetch_new_log_ids()
   
       container_entry_point_commands = generate_entry_point_commands(log_ids)
   
       KubernetesPodOperator.partial(
           name="process_log_example_dag",
           namespace="composer-user-workloads",
           config_file="/home/airflow/composer_kube_config",
          kubernetes_conn_id="kubernetes_default",
           task_id="process_log",
           image="us-central1-docker.pkg.dev/project/images/example-image",
           get_logs=True,
           log_events_on_failure=True,
           do_xcom_push=False,
           volumes=VOLUMES,
           volume_mounts=VOLUME_MOUNTS,
           # GCS Fuse CSI driver relies on pod annotations to configure itself 
and the container sidecar it runs in.
           annotations={
               "gke-gcsfuse/volumes": "true",
               "gke-gcsfuse/ephemeral-storage-limit": "1Gi",
               "gke-gcsfuse/cpu-request": "500m",
               "gke-gcsfuse/memory-request": "1Gi",
               "gke-gcsfuse/ephemeral-storage-request": "1Gi",
           },
           container_resources=k8s.V1ResourceRequirements(
               limits={
                   'memory': "3Gi",
                   'cpu': "1",
                   'ephemeral-storage': "0Gi"
               },
               requests={
                   'memory': "3Gi",
                   'cpu': "1",
                   'ephemeral-storage': "0Gi"
               },
           ),
       ).expand(cmds=container_entry_point_commands)
   
   
   dag()
   ```
   
   The container runs a simple python script that writes 100 files to /out
   
   ```
   def main():
     for i in range(0,100):
       with open(f"/out/{i}.txt", "w", encoding="utf8") as f:
         f.write(f"{i}")
     raise Exception("oops")
   
   if __name__ == "__main__":
     main()
   ```
   
   
   ### Anything else
   
   I can work around the issue with:
   
   ```python
   class MyPodManager(PodManager):
   
       def __init__(self, **kwargs):
           super().__init__(**kwargs)
   
       def await_pod_completion(self, pod: V1Pod, istio_enabled: bool = False,
                                container_name: str = "base") -> V1Pod:
            while True and time.time() - start < 7200:
               remote_pod = self.read_pod(pod)
               if remote_pod.status.phase in PodPhase.terminal_states:
                   break
               if istio_enabled and container_is_completed(remote_pod, 
container_name):
                   break
               if container_is_completed(remote_pod, container_name):
                   # Always break if the container is completed, even if 
sidecar containers continue to stay up
                   # and keep the pod up.
                   self.log.info("Pod %s should terminate now", 
pod.metadata.name)
                   break
               self.log.info("Pod %s has phase %s", pod.metadata.name, 
remote_pod.status.phase)
               time.sleep(2)
           return remote_pod
   
   class MyKubernetesPodOperator(KubernetesPodOperator):
   
       def __init__(self, **kwargs):
           super().__init__(**kwargs)
   
       @cached_property
       def pod_manager(self) -> PodManager:
           return MyPodManager(kube_client=self.client, 
callbacks=self.callbacks,                 
progress_callback=self._progress_callback)
   
   ```
   
   ### Are you willing to submit PR?
   
   - [X] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to