This is an automated email from the ASF dual-hosted git repository.
msumit pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dfcd0490755 Fix dag-processor crashing due to MySql deadlock errors
(#60166)
dfcd0490755 is described below
commit dfcd0490755962a7eab81b8936a3a90d001654c1
Author: Sumit Maheshwari <[email protected]>
AuthorDate: Mon Jan 12 22:22:42 2026 +0530
Fix dag-processor crashing due to MySql deadlock errors (#60166)
MySQL may throw deadlock errors when multiple DAG-Processor instances are
running. The issue is a fetch sub-query being used within a delete query,
which
is sometimes causing a deadlock in MySQL.
---------
Co-authored-by: Sumit Maheshwari <[email protected]>
Co-authored-by: Jarek Potiuk <[email protected]>
---
airflow-core/src/airflow/models/asset.py | 17 +++++++++++------
1 file changed, 11 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/models/asset.py
b/airflow-core/src/airflow/models/asset.py
index b0184382d34..372bd65a2b0 100644
--- a/airflow-core/src/airflow/models/asset.py
+++ b/airflow-core/src/airflow/models/asset.py
@@ -112,12 +112,17 @@ def remove_references_to_deleted_dags(session: Session):
DagScheduleAssetAliasReference,
TaskOutletAssetReference,
]
- for model in models_to_check:
- session.execute(
- delete(model)
-
.where(model.dag_id.in_(select(DagModel.dag_id).where(DagModel.is_stale)))
- .execution_options(synchronize_session="fetch")
- )
+
+ # The queries need to be done in separate steps, because in the case of
multiple
+ # dag processors on MySQL, there could be a deadlock caused by acquiring
both an
+ # exclusive lock for deletion and shared lock for query in reverse sequence
+ if stale_dag_ids :=
session.scalars(select(DagModel.dag_id).where(DagModel.is_stale)).all():
+ for model in models_to_check:
+ session.execute(
+ delete(model)
+ .where(model.dag_id.in_(stale_dag_ids))
+ .execution_options(synchronize_session="fetch")
+ )
alias_association_table = Table(