jaketf opened a new issue, #58968:
URL: https://github.com/apache/airflow/issues/58968

   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   all versions
   
   ### Apache Airflow version
   
   all versions
   
   ### Operating System
   
   linux
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   this can happen on any deployment type / executor as it's an issue with KPO. 
   
   ### What happened
   
   tl;dr KPO sometimes hang around long after their corresponding TI has 
reached a terminal state.
   
   
   sometimes the xcom sidecar never exits leaving zombie KPO pods around 
indefinitely.
   this could also happen if the user defines a pod with a sidecar that never 
exits and ignores signals.
   
   A KubernetesPodOperator (KPO) pod is considered a zombie if any of the 
following apply:
     1. It doesn't match any active (non-terminal) task instance in the database
     2. The pod's `try_number` is less than the matching active task instance's 
current `try_number` (old retry pod)
   
   ### What you think should happen instead
   
   There should be some clean-up logic perhaps in the scheduler, instead of the 
operator clean up itself to force kill zombie KPO pods.
   
   
   similar to that which cleans up Kubernetes Executor zombies (gated by the 
configurations below)
   AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS
   AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS_ON_FAILURE
   
   
   
   1. **List all KPO pods**: Uses Kubernetes API with label selector 
`kubernetes_pod_operator=True`
   2. **Extract TI details from pods**: For each pod, extracts normalized task 
instance details from labels (`dag_id`, `task_id`, `run_id`, `map_index`, 
`try_number`). Pod labels are already normalized by Airflow using 
`make_safe_label_value()`
   3. **Query active task instances**: Queries all active (non-terminal) task 
instances from the database in a single batch query, filtering for 
KubernetesPodOperator-related operators (KubernetesPodOperator, 
KubernetesPodOperatorAsync, SparkKubernetesPodOperator, etc., but excluding 
"external" EksPodOperator and GKEStartPodOperator)
   4. **Normalize DB values to pod label format**: Normalizes database values 
(`dag_id`, `task_id`, `run_id`, `map_index`) using `make_safe_label_value()` to 
match the format used in pod labels
   5. **Exact matching**: Compares normalized pod labels with normalized 
database values using exact matching
   6. **Identify zombies**: Pods that don't match any active task instance are 
considered zombies. Pods matching active TIs are checked for old retry scenarios
   7. **Terminate zombies**: Force deletes all identified zombie pods
   
   ### How to reproduce
   
   I've used this dag for testing this scenario 
   
   you can manually kill the zombie pod TIs
   ```python
   from airflow import __version__ as airflow_version
   from airflow.decorators import dag
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   from kubernetes.client import models as k8s
   from pendulum import datetime
   
   
   def sleep_container(name: str, duration: int | str) -> k8s.V1Container:
       return k8s.V1Container(
           name=name,
           image="busybox:latest",
           command=["/bin/sh", "-c"],
           args=[f"sleep {duration}"],
       )
   
   
   def healthy_pod_spec() -> k8s.V1PodSpec:
       return k8s.V1PodSpec(
           containers=[
               sleep_container("base", "20"),
           ],
           termination_grace_period_seconds=86400,
           restart_policy="Never",
       )
   
   
   def zombie_pod_spec() -> k8s.V1PodSpec:
       return k8s.V1PodSpec(
           containers=[
               sleep_container("base", "5"),
               sleep_container("sidecar", "infinity"),
           ],
       )
   
   
   @dag(
       dag_id="kpo_zombie_dag",
       schedule=None,
       start_date=datetime(2024, 1, 1),
       catchup=False,
       tags=["test", "kpo", "zombie"],
   )
   def kpo_zombie_dag():
       zombie_task = KubernetesPodOperator(
           task_id="zombie_kpo_task",
           full_pod_spec=k8s.V1Pod(
               metadata=k8s.V1ObjectMeta(),
               spec=zombie_pod_spec(),
           ),
       )
       healthy_task = KubernetesPodOperator(
           task_id="healthy_kpo_task",
           full_pod_spec=k8s.V1Pod(
               metadata=k8s.V1ObjectMeta(),
               spec=healthy_pod_spec(),
           ),
       )
   
       tasks = [zombie_task, healthy_task]
   
       # Create 3 mapped healthy KPO tasks (only for Airflow >= 2.3.0)
       # Mapped tasks were introduced in Airflow 2.3.0
       try:
           # Parse version string (e.g., "2.3.0" or "2.3.0+astro.1")
           version_parts = airflow_version.split("+")[0].split(".")
           major = int(version_parts[0])
           minor = int(version_parts[1]) if len(version_parts) > 1 else 0
   
           supports_mapped_tasks = major > 2 or (major == 2 and minor >= 3)
   
           if supports_mapped_tasks:
               # Use expand_kwargs to create 2 mapped task instances (map_index 
0, 1)
               # one zombie and one healthy
               # Each dict in the list creates one mapped instance
               mapped_healthy_task = KubernetesPodOperator.partial(
                   task_id="mapped_kpo_task",
               ).expand_kwargs(
                   [
                       {
                           "full_pod_spec": k8s.V1Pod(
                               metadata=k8s.V1ObjectMeta(),
                               spec=zombie_pod_spec(),
                           ),
                       },
                       {
                           "full_pod_spec": k8s.V1Pod(
                               metadata=k8s.V1ObjectMeta(),
                               spec=healthy_pod_spec(),
                           ),
                       },
                   ]
               )
               tasks.append(mapped_healthy_task)
       except (ValueError, IndexError):
           # If version parsing fails, skip mapped tasks to be safe
           pass
   
       return tasks
   
   
   kpo_dag = kpo_zombie_dag()
   
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to