This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 030ada1751d [v3-1-test] Fix dag-processor crashing due to MySql 
deadlock errors (#60166) (#60418)
030ada1751d is described below

commit 030ada1751d6efde8b59a4e3e5ab58880a2377be
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jan 14 12:34:28 2026 +0100

    [v3-1-test] Fix dag-processor crashing due to MySql deadlock errors 
(#60166) (#60418)
    
    MySQL may throw deadlock errors when multiple DAG-Processor instances are
    running. The issue is a fetch sub-query being used within a delete query, 
which
    is sometimes causing a deadlock in MySQL.
    
    ---------
    (cherry picked from commit dfcd0490755962a7eab81b8936a3a90d001654c1)
    
    Co-authored-by: Sumit Maheshwari <[email protected]>
    Co-authored-by: Sumit Maheshwari <[email protected]>
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 airflow-core/src/airflow/models/asset.py | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/models/asset.py 
b/airflow-core/src/airflow/models/asset.py
index 8b28f2bea61..650cb8589b4 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -108,12 +108,17 @@ def remove_references_to_deleted_dags(session: Session):
         DagScheduleAssetAliasReference,
         TaskOutletAssetReference,
     ]
-    for model in models_to_check:
-        session.execute(
-            delete(model)
-            
.where(model.dag_id.in_(select(DagModel.dag_id).where(DagModel.is_stale)))
-            .execution_options(synchronize_session="fetch")
-        )
+
+    # The queries need to be done in separate steps, because in the case of 
multiple
+    # dag processors on MySQL, there could be a deadlock caused by acquiring 
both an
+    # exclusive lock for deletion and shared lock for query in reverse sequence
+    if stale_dag_ids := 
session.scalars(select(DagModel.dag_id).where(DagModel.is_stale)).all():
+        for model in models_to_check:
+            session.execute(
+                delete(model)
+                .where(model.dag_id.in_(stale_dag_ids))
+                .execution_options(synchronize_session="fetch")
+            )
 
 
 alias_association_table = Table(

Reply via email to