jaketf opened a new issue, #58968:
URL: https://github.com/apache/airflow/issues/58968
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
all versions
### Apache Airflow version
all versions
### Operating System
linux
### Deployment
Astronomer
### Deployment details
this can happen on any deployment type / executor as it's an issue with KPO.
### What happened
tl;dr KPO sometimes hang around long after their corresponding TI has
reached a terminal state.
sometimes the xcom sidecar never exits leaving zombie KPO pods around
indefinitely.
this could also happen if the user defines a pod with a sidecar that never
exits and ignores signals.
A KubernetesPodOperator (KPO) pod is considered a zombie if any of the
following apply:
1. It doesn't match any active (non-terminal) task instance in the database
2. The pod's `try_number` is less than the matching active task instance's
current `try_number` (old retry pod)
### What you think should happen instead
There should be some clean-up logic perhaps in the scheduler, instead of the
operator clean up itself to force kill zombie KPO pods.
similar to that which cleans up Kubernetes Executor zombies (gated by the
configurations below)
AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS
AIRFLOW__KUBERNETES_EXECUTOR__DELETE_WORKER_PODS_ON_FAILURE
1. **List all KPO pods**: Uses Kubernetes API with label selector
`kubernetes_pod_operator=True`
2. **Extract TI details from pods**: For each pod, extracts normalized task
instance details from labels (`dag_id`, `task_id`, `run_id`, `map_index`,
`try_number`). Pod labels are already normalized by Airflow using
`make_safe_label_value()`
3. **Query active task instances**: Queries all active (non-terminal) task
instances from the database in a single batch query, filtering for
KubernetesPodOperator-related operators (KubernetesPodOperator,
KubernetesPodOperatorAsync, SparkKubernetesPodOperator, etc., but excluding
"external" EksPodOperator and GKEStartPodOperator)
4. **Normalize DB values to pod label format**: Normalizes database values
(`dag_id`, `task_id`, `run_id`, `map_index`) using `make_safe_label_value()` to
match the format used in pod labels
5. **Exact matching**: Compares normalized pod labels with normalized
database values using exact matching
6. **Identify zombies**: Pods that don't match any active task instance are
considered zombies. Pods matching active TIs are checked for old retry scenarios
7. **Terminate zombies**: Force deletes all identified zombie pods
### How to reproduce
I've used this dag for testing this scenario
you can manually kill the zombie pod TIs
```python
from airflow import __version__ as airflow_version
from airflow.decorators import dag
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
from kubernetes.client import models as k8s
from pendulum import datetime
def sleep_container(name: str, duration: int | str) -> k8s.V1Container:
return k8s.V1Container(
name=name,
image="busybox:latest",
command=["/bin/sh", "-c"],
args=[f"sleep {duration}"],
)
def healthy_pod_spec() -> k8s.V1PodSpec:
return k8s.V1PodSpec(
containers=[
sleep_container("base", "20"),
],
termination_grace_period_seconds=86400,
restart_policy="Never",
)
def zombie_pod_spec() -> k8s.V1PodSpec:
return k8s.V1PodSpec(
containers=[
sleep_container("base", "5"),
sleep_container("sidecar", "infinity"),
],
)
@dag(
dag_id="kpo_zombie_dag",
schedule=None,
start_date=datetime(2024, 1, 1),
catchup=False,
tags=["test", "kpo", "zombie"],
)
def kpo_zombie_dag():
zombie_task = KubernetesPodOperator(
task_id="zombie_kpo_task",
full_pod_spec=k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=zombie_pod_spec(),
),
)
healthy_task = KubernetesPodOperator(
task_id="healthy_kpo_task",
full_pod_spec=k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=healthy_pod_spec(),
),
)
tasks = [zombie_task, healthy_task]
# Create 3 mapped healthy KPO tasks (only for Airflow >= 2.3.0)
# Mapped tasks were introduced in Airflow 2.3.0
try:
# Parse version string (e.g., "2.3.0" or "2.3.0+astro.1")
version_parts = airflow_version.split("+")[0].split(".")
major = int(version_parts[0])
minor = int(version_parts[1]) if len(version_parts) > 1 else 0
supports_mapped_tasks = major > 2 or (major == 2 and minor >= 3)
if supports_mapped_tasks:
# Use expand_kwargs to create 2 mapped task instances (map_index
0, 1)
# one zombie and one healthy
# Each dict in the list creates one mapped instance
mapped_healthy_task = KubernetesPodOperator.partial(
task_id="mapped_kpo_task",
).expand_kwargs(
[
{
"full_pod_spec": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=zombie_pod_spec(),
),
},
{
"full_pod_spec": k8s.V1Pod(
metadata=k8s.V1ObjectMeta(),
spec=healthy_pod_spec(),
),
},
]
)
tasks.append(mapped_healthy_task)
except (ValueError, IndexError):
# If version parsing fails, skip mapped tasks to be safe
pass
return tasks
kpo_dag = kpo_zombie_dag()
```
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]