paultmathew opened a new issue, #67296:
URL: https://github.com/apache/airflow/issues/67296
### Under which category would you file this issue?
Providers
### Apache Airflow version
3.2.1
### What happened and how to reproduce it?
`KubernetesPodTrigger.safe_to_cancel()` calls `get_task_state()`, which calls
`RuntimeTaskInstance.get_task_states(...)` against the execution API's
`/states` endpoint and looks the response up by plain `task_id`:
```python
#
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
return
task_states_response[self.task_instance.run_id][self.task_instance.task_id]
```
But the API endpoint encodes the response key *differently* for mapped TIs
(one entry per `(task_id, map_index)` pair):
```python
# airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
[
run_id_task_state_map[task.run_id].update(
{task.task_id: task.state}
if task.map_index < 0
else {f"{task.task_id}_{task.map_index}": task.state}
)
for task in results
]
```
So for any mapped deferrable `KubernetesPodOperator` task — for example a
`KubernetesPodOperator(..., deferrable=True).expand(...)` or one nested
inside a `@task_group.expand(...)` — the trigger's lookup with
`task_id="map_group.task_a"` and `map_index=2` looks for key
`"map_group.task_a"`, but the response only contains
`"map_group.task_a_2"`. `KeyError` → wrapped in `AirflowException` →
`cleanup()`'s broad `except Exception` defensively skips
`hook.delete_pod()` → the pod is never deleted on user-mark-failed,
keeping it alive until `active_deadline_seconds` expires (often hours).
Log from a real staging run (Airflow 3.2.1 +
`apache-airflow-providers-cncf-kubernetes==10.16.0`):
```
[2026-05-21T17:46:25.889914Z] WARNING - Could not determine task state
during cleanup; skipping pod deletion to be safe.
AirflowException: ('TaskInstance with dag_id: %s, task_id: %s, run_id: %s
and map_index: %s is not found', 'platform_dag_behavior_test',
'map_group.task_a', 'manual__2026-05-21T17:43:39.041740+00:00', 2)
File pod.py, line 434 in cleanup
File pod.py, line 420 in safe_to_cancel
File pod.py, line 401 in get_task_state
,KeyError: 'map_group.task_a'
File pod.py, line 399 in get_task_state
```
The same trigger code is present on `main` — bug is not fixed there.
### Impact
Affects every mapped deferrable KPO task on Airflow 3.x with
`apache-airflow-providers-cncf-kubernetes >= 10.15.0` (which is when
`safe_to_cancel` was introduced in #62401). Silently breaks the
mark-failed-deletes-pod contract — operators of mapped deferred KPO tasks
who hit "Mark Failed" in the UI see the pod stay `Running` until the pod
deadline. For long-lived deferrable tasks (e.g. continuous stream
pollers) this can cause overlapping-writer races against external
systems (Iceberg commits in our case).
### What you think should happen instead?
`get_task_state()` should compose the lookup key the same way the API
server encodes it — appending the `_{map_index}` suffix when the TI is
mapped. That makes the lookup succeed for mapped TIs and produces the
correct `safe_to_cancel()` result, which lets `cleanup()` call
`hook.delete_pod()` and Mark Failed actually terminates the pod.
Suggested diff (matches the patch we are running as a local plugin
workaround):
```diff
---
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
+++
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
@@ async def get_task_state(self):
task_states_response = await
sync_to_async(RuntimeTaskInstance.get_task_states)(
dag_id=self.task_instance.dag_id,
task_ids=[self.task_instance.task_id],
run_ids=[self.task_instance.run_id],
map_index=self.task_instance.map_index,
)
+ # The /states endpoint suffixes the response key with
+ # `_{map_index}` for mapped TIs (see the dict-key construction
+ # in airflow-core/.../execution_api/routes/task_instances.py).
+ ti_key = (
+
f"{self.task_instance.task_id}_{self.task_instance.map_index}"
+ if self.task_instance.map_index >= 0
+ else self.task_instance.task_id
+ )
try:
- return
task_states_response[self.task_instance.run_id][self.task_instance.task_id]
+ return
task_states_response[self.task_instance.run_id][ti_key]
except KeyError:
raise AirflowException(
"TaskInstance with dag_id: %s, task_id: %s, run_id: %s
and map_index: %s is not found",
self.task_instance.dag_id,
self.task_instance.task_id,
self.task_instance.run_id,
self.task_instance.map_index,
)
```
Plus a unit test in
`providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py`
covering both the mapped and non-mapped branches.
---
## How to reproduce
Minimal repro DAG:
```python
from datetime import datetime, timezone
from airflow.providers.cncf.kubernetes.operators.pod import
KubernetesPodOperator
from airflow.sdk import dag, task_group
@task_group(group_id="map_group")
def group(idx: int):
KubernetesPodOperator(
task_id="probe",
image="docker.io/library/alpine:3.20",
cmds=["/bin/sh", "-c"],
arguments=[f'echo idx={idx}; sleep 600; exit 0'],
deferrable=True,
on_kill_action="delete_pod",
)
@dag(
dag_id="kpo_mapped_safe_to_cancel_bug",
schedule=None,
start_date=datetime(2026, 1, 1, tzinfo=timezone.utc),
catchup=False,
)
def bug_repro():
group.expand(idx=[0, 1, 2])
bug_repro()
```
Steps:
1. Trigger the DAG.
2. Wait for the three `map_group.probe` mapped TIs to enter DEFERRED state
(the pods are sleeping).
3. Mark the dag run as **Failed** in the UI.
4. Observe: pods stay `Running` for the full 600s sleep instead of being
deleted. The triggerer log contains the `Could not determine task
state during cleanup; skipping pod deletion to be safe.` warning from
the snippet above.
Removing the `.expand(...)` (i.e. a single non-mapped
`KubernetesPodOperator`)
makes the bug go away — non-mapped TIs use the plain `task_id` key in the
API response, so the trigger's lookup succeeds.
### Operating System
Container base: `apache/airflow:slim-3.2.1-python3.12`, running on Amazon
Linux 2 nodes (EKS, ARM64 / Graviton). Not OS-specific — the mismatch is purely
Python-level between the trigger and the execution API.
### Deployment
Official Apache Airflow Helm Chart
### Apache Airflow Provider(s)
cncf-kubernetes
### Versions of Apache Airflow Providers
apache-airflow-providers-cncf-kubernetes==10.16.0
### Official Helm Chart version
1.19.0
### Kubernetes Version
Amazon EKS 1.33 (Kubernetes 1.33)
### Helm Chart configuration
Not Applicable
### Docker Image customizations
Not Applicable
### Anything else?
Latent since `apache-airflow-providers-cncf-kubernetes==10.15.0`, where
the `safe_to_cancel` mechanism was introduced in #62401. Before 10.15
the trigger had no `cleanup()` / `safe_to_cancel()` and so the lookup
mismatch had no observable effect (the pod also wasn't deleted, but for
unrelated reasons).
We are currently running a local Airflow plugin that monkey-patches
`KubernetesPodTrigger.get_task_state` with the diff above as a
workaround. Happy to share if useful — but the upstream fix is small
enough that a plugin shouldn't be needed long-term.
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]