fediabdelhedi commented on code in PR #56710:
URL: https://github.com/apache/airflow/pull/56710#discussion_r2435296545
##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -294,17 +294,50 @@ def _execute_dag_callbacks(dagbag: DagBag, request:
DagCallbackRequest, log: Fil
def _execute_task_callbacks(dagbag: DagBag, request: TaskCallbackRequest, log:
FilteringBoundLogger) -> None:
+ # Check if this is a failure callback early
Review Comment:
Thanks for reviewing! I've simplified the approach based on your feedback.
What I changed:
- Removed the extra `ti is None` checks (assumes `request.ti` always exists)
- Now just using `dagbag.dags.get(dag_id)` for safe lookup
- Simpler, more minimal fix that addresses the core KeyError issue
Reproduction scenario (from #56701):
This occurs when there's a race condition between DAG parsing and callback
execution:
1. DAG Removed During Processing:
- Time T1: DAG `dag_123` exists, task fails, callback request queued
- Time T2: Operator deletes `dag_123.py` from disk
- Time T3: DagBag re-parses, `dag_123` no longer in `dagbag.dags`
- Time T4: Processor tries to execute callback for `dag_123`
- **Result:** `KeyError: 'dag_123'` at line `dag =
dagbag.dags[request.ti.dag_id]`
2. Parse Delay:
- Large DAG file takes time to parse
- Callback request arrives before DAG is loaded into DagBag
- **Result:** Same `KeyError`
The original error from #56701:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]