This is an automated email from the ASF dual-hosted git repository.

msumit pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dfcd0490755 Fix dag-processor crashing due to MySql deadlock errors 
(#60166)
dfcd0490755 is described below

commit dfcd0490755962a7eab81b8936a3a90d001654c1
Author: Sumit Maheshwari <[email protected]>
AuthorDate: Mon Jan 12 22:22:42 2026 +0530

    Fix dag-processor crashing due to MySql deadlock errors (#60166)
    
    MySQL may throw deadlock errors when multiple DAG-Processor instances are
    running. The issue is a fetch sub-query being used within a delete query, 
which
    is sometimes causing a deadlock in MySQL.
    
    ---------
    
    Co-authored-by: Sumit Maheshwari <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow-core/src/airflow/models/asset.py | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/models/asset.py 
b/airflow-core/src/airflow/models/asset.py
index b0184382d34..372bd65a2b0 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -112,12 +112,17 @@ def remove_references_to_deleted_dags(session: Session):
         DagScheduleAssetAliasReference,
         TaskOutletAssetReference,
     ]
-    for model in models_to_check:
-        session.execute(
-            delete(model)
-            
.where(model.dag_id.in_(select(DagModel.dag_id).where(DagModel.is_stale)))
-            .execution_options(synchronize_session="fetch")
-        )
+
+    # The queries need to be done in separate steps, because in the case of 
multiple
+    # dag processors on MySQL, there could be a deadlock caused by acquiring 
both an
+    # exclusive lock for deletion and shared lock for query in reverse sequence
+    if stale_dag_ids := 
session.scalars(select(DagModel.dag_id).where(DagModel.is_stale)).all():
+        for model in models_to_check:
+            session.execute(
+                delete(model)
+                .where(model.dag_id.in_(stale_dag_ids))
+                .execution_options(synchronize_session="fetch")
+            )
 
 
 alias_association_table = Table(

Reply via email to