fediabdelhedi commented on code in PR #56710:
URL: https://github.com/apache/airflow/pull/56710#discussion_r2435296545


##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -294,17 +294,50 @@ def _execute_dag_callbacks(dagbag: DagBag, request: 
DagCallbackRequest, log: Fil
 
 
 def _execute_task_callbacks(dagbag: DagBag, request: TaskCallbackRequest, log: 
FilteringBoundLogger) -> None:
+    # Check if this is a failure callback early

Review Comment:
   Thanks for reviewing! I've simplified the approach based on your feedback.
   
   What I changed:
   - Removed the extra `ti is None` checks (assumes `request.ti` always exists)
   - Now just using `dagbag.dags.get(dag_id)` for safe lookup
   - Simpler, more minimal fix that addresses the core KeyError issue
   
   Reproduction scenario (from #56701):
   
   This occurs when there's a race condition between DAG parsing and callback 
execution:
   
   1. DAG Removed During Processing:
      - Time T1: DAG `dag_123` exists, task fails, callback request queued
      - Time T2: Operator deletes `dag_123.py` from disk
      - Time T3: DagBag re-parses, `dag_123` no longer in `dagbag.dags`
      - Time T4: Processor tries to execute callback for `dag_123`
      - **Result:** `KeyError: 'dag_123'` at line `dag = 
dagbag.dags[request.ti.dag_id]`
   
   2. Parse Delay:
      - Large DAG file takes time to parse
      - Callback request arrives before DAG is loaded into DagBag
      - **Result:** Same `KeyError`
   
   The original error from #56701:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to