paultmathew opened a new issue, #67296:
URL: https://github.com/apache/airflow/issues/67296

   ### Under which category would you file this issue?
   
   Providers
   
   ### Apache Airflow version
   
   3.2.1
   
   ### What happened and how to reproduce it?
   
   `KubernetesPodTrigger.safe_to_cancel()` calls `get_task_state()`, which calls
   `RuntimeTaskInstance.get_task_states(...)` against the execution API's
   `/states` endpoint and looks the response up by plain `task_id`:
   
   ```python
   # 
providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
   return 
task_states_response[self.task_instance.run_id][self.task_instance.task_id]
   ```
   
   But the API endpoint encodes the response key *differently* for mapped TIs
   (one entry per `(task_id, map_index)` pair):
   
   ```python
   # airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
   [
       run_id_task_state_map[task.run_id].update(
           {task.task_id: task.state}
           if task.map_index < 0
           else {f"{task.task_id}_{task.map_index}": task.state}
       )
       for task in results
   ]
   ```
   
   So for any mapped deferrable `KubernetesPodOperator` task — for example a
   `KubernetesPodOperator(..., deferrable=True).expand(...)` or one nested
   inside a `@task_group.expand(...)` — the trigger's lookup with
   `task_id="map_group.task_a"` and `map_index=2` looks for key
   `"map_group.task_a"`, but the response only contains
   `"map_group.task_a_2"`. `KeyError` → wrapped in `AirflowException` →
   `cleanup()`'s broad `except Exception` defensively skips
   `hook.delete_pod()` → the pod is never deleted on user-mark-failed,
   keeping it alive until `active_deadline_seconds` expires (often hours).
   
   Log from a real staging run (Airflow 3.2.1 +
   `apache-airflow-providers-cncf-kubernetes==10.16.0`):
   
   ```
   [2026-05-21T17:46:25.889914Z] WARNING - Could not determine task state
   during cleanup; skipping pod deletion to be safe.
   AirflowException: ('TaskInstance with dag_id: %s, task_id: %s, run_id: %s
   and map_index: %s is not found', 'platform_dag_behavior_test',
   'map_group.task_a', 'manual__2026-05-21T17:43:39.041740+00:00', 2)
       File pod.py, line 434 in cleanup
       File pod.py, line 420 in safe_to_cancel
       File pod.py, line 401 in get_task_state
   ,KeyError: 'map_group.task_a'
       File pod.py, line 399 in get_task_state
   ```
   
   The same trigger code is present on `main` — bug is not fixed there.
   
   ### Impact
   
   Affects every mapped deferrable KPO task on Airflow 3.x with
   `apache-airflow-providers-cncf-kubernetes >= 10.15.0` (which is when
   `safe_to_cancel` was introduced in #62401). Silently breaks the
   mark-failed-deletes-pod contract — operators of mapped deferred KPO tasks
   who hit "Mark Failed" in the UI see the pod stay `Running` until the pod
   deadline. For long-lived deferrable tasks (e.g. continuous stream
   pollers) this can cause overlapping-writer races against external
   systems (Iceberg commits in our case).
   
   ### What you think should happen instead?
   
   `get_task_state()` should compose the lookup key the same way the API
   server encodes it — appending the `_{map_index}` suffix when the TI is
   mapped. That makes the lookup succeed for mapped TIs and produces the
   correct `safe_to_cancel()` result, which lets `cleanup()` call
   `hook.delete_pod()` and Mark Failed actually terminates the pod.
   
   Suggested diff (matches the patch we are running as a local plugin
   workaround):
   
   ```diff
   --- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
   +++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py
   @@ async def get_task_state(self):
                task_states_response = await 
sync_to_async(RuntimeTaskInstance.get_task_states)(
                    dag_id=self.task_instance.dag_id,
                    task_ids=[self.task_instance.task_id],
                    run_ids=[self.task_instance.run_id],
                    map_index=self.task_instance.map_index,
                )
   +            # The /states endpoint suffixes the response key with
   +            # `_{map_index}` for mapped TIs (see the dict-key construction
   +            # in airflow-core/.../execution_api/routes/task_instances.py).
   +            ti_key = (
   +                
f"{self.task_instance.task_id}_{self.task_instance.map_index}"
   +                if self.task_instance.map_index >= 0
   +                else self.task_instance.task_id
   +            )
                try:
   -                return 
task_states_response[self.task_instance.run_id][self.task_instance.task_id]
   +                return 
task_states_response[self.task_instance.run_id][ti_key]
                except KeyError:
                    raise AirflowException(
                        "TaskInstance with dag_id: %s, task_id: %s, run_id: %s 
and map_index: %s is not found",
                        self.task_instance.dag_id,
                        self.task_instance.task_id,
                        self.task_instance.run_id,
                        self.task_instance.map_index,
                    )
   ```
   
   Plus a unit test in
   `providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py`
   covering both the mapped and non-mapped branches.
   
   ---
   
   ## How to reproduce
   
   Minimal repro DAG:
   
   ```python
   from datetime import datetime, timezone
   from airflow.providers.cncf.kubernetes.operators.pod import 
KubernetesPodOperator
   from airflow.sdk import dag, task_group
   
   
   @task_group(group_id="map_group")
   def group(idx: int):
       KubernetesPodOperator(
           task_id="probe",
           image="docker.io/library/alpine:3.20",
           cmds=["/bin/sh", "-c"],
           arguments=[f'echo idx={idx}; sleep 600; exit 0'],
           deferrable=True,
           on_kill_action="delete_pod",
       )
   
   
   @dag(
       dag_id="kpo_mapped_safe_to_cancel_bug",
       schedule=None,
       start_date=datetime(2026, 1, 1, tzinfo=timezone.utc),
       catchup=False,
   )
   def bug_repro():
       group.expand(idx=[0, 1, 2])
   
   
   bug_repro()
   ```
   
   Steps:
   
   1. Trigger the DAG.
   2. Wait for the three `map_group.probe` mapped TIs to enter DEFERRED state
      (the pods are sleeping).
   3. Mark the dag run as **Failed** in the UI.
   4. Observe: pods stay `Running` for the full 600s sleep instead of being
      deleted. The triggerer log contains the `Could not determine task
      state during cleanup; skipping pod deletion to be safe.` warning from
      the snippet above.
   
   Removing the `.expand(...)` (i.e. a single non-mapped 
`KubernetesPodOperator`)
   makes the bug go away — non-mapped TIs use the plain `task_id` key in the
   API response, so the trigger's lookup succeeds.
   
   ### Operating System
   
   Container base: `apache/airflow:slim-3.2.1-python3.12`, running on Amazon 
Linux 2 nodes (EKS, ARM64 / Graviton). Not OS-specific — the mismatch is purely 
Python-level between the trigger and the execution API.
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Apache Airflow Provider(s)
   
   cncf-kubernetes
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-cncf-kubernetes==10.16.0
   
   ### Official Helm Chart version
   
   1.19.0
   
   ### Kubernetes Version
   
   Amazon EKS 1.33 (Kubernetes 1.33)
   
   ### Helm Chart configuration
   
   Not Applicable
   
   ### Docker Image customizations
   
   Not Applicable
   
   ### Anything else?
   
   Latent since `apache-airflow-providers-cncf-kubernetes==10.15.0`, where
   the `safe_to_cancel` mechanism was introduced in #62401. Before 10.15
   the trigger had no `cleanup()` / `safe_to_cancel()` and so the lookup
   mismatch had no observable effect (the pod also wasn't deleted, but for
   unrelated reasons).
   
   We are currently running a local Airflow plugin that monkey-patches
   `KubernetesPodTrigger.get_task_state` with the diff above as a
   workaround. Happy to share if useful — but the upstream fix is small
   enough that a plugin shouldn't be needed long-term.
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to