This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new db2d73d Retry deadlocked transactions on deleting old rendered task
fields (#18616)
db2d73d is described below
commit db2d73d95e793e63e152692f216deec9b9d9bc85
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Sep 30 18:50:52 2021 +0200
Retry deadlocked transactions on deleting old rendered task fields (#18616)
The query that deletes rendered old rendered task fields for MySQL
can occasionally deadlock because it is unncesssary complex with
a subquery (due to features missing in MySQL). This change
adds DB retries to get rid of the deadlock (as is the
recommended practice for MySQL).
Fixes: #18512
* Update airflow/models/renderedtifields.py
Co-authored-by: Kaxil Naik <[email protected]>
* Apply suggestions from code review
Co-authored-by: Ash Berlin-Taylor <[email protected]>
Co-authored-by: Kaxil Naik <[email protected]>
Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
airflow/models/renderedtifields.py | 34 ++++++++++++++++++++--------------
1 file changed, 20 insertions(+), 14 deletions(-)
diff --git a/airflow/models/renderedtifields.py
b/airflow/models/renderedtifields.py
index 0572c89..23b2b78 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -28,6 +28,7 @@ from airflow.models.base import ID_LEN, Base
from airflow.models.taskinstance import TaskInstance
from airflow.serialization.helpers import serialize_template_field
from airflow.settings import json
+from airflow.utils.retries import retry_db_transaction
from airflow.utils.session import provide_session
from airflow.utils.sqlalchemy import UtcDateTime
@@ -161,20 +162,7 @@ class RenderedTaskInstanceFields(Base):
tuple_(cls.dag_id, cls.task_id,
cls.execution_date).notin_(subq1),
).delete(synchronize_session=False)
elif session.bind.dialect.name in ["mysql"]:
- # Fetch Top X records given dag_id & task_id ordered by Execution
Date
- subq1 = tis_to_keep_query.subquery('subq1')
-
- # Second Subquery
- # Workaround for MySQL Limitation
(https://stackoverflow.com/a/19344141/5691525)
- # Limitation: This version of MySQL does not yet support
- # LIMIT & IN/ALL/ANY/SOME subquery
- subq2 = session.query(subq1.c.dag_id, subq1.c.task_id,
subq1.c.execution_date).subquery('subq2')
-
- session.query(cls).filter(
- cls.dag_id == dag_id,
- cls.task_id == task_id,
- tuple_(cls.dag_id, cls.task_id,
cls.execution_date).notin_(subq2),
- ).delete(synchronize_session=False)
+ cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id,
tis_to_keep_query)
else:
# Fetch Top X records given dag_id & task_id ordered by Execution
Date
tis_to_keep = tis_to_keep_query.all()
@@ -191,3 +179,21 @@ class RenderedTaskInstanceFields(Base):
]
session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)
+
+ @classmethod
+ @retry_db_transaction
+ def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id,
tis_to_keep_query):
+ # Fetch Top X records given dag_id & task_id ordered by Execution Date
+ subq1 = tis_to_keep_query.subquery('subq1')
+ # Second Subquery
+ # Workaround for MySQL Limitation
(https://stackoverflow.com/a/19344141/5691525)
+ # Limitation: This version of MySQL does not yet support
+ # LIMIT & IN/ALL/ANY/SOME subquery
+ subq2 = session.query(subq1.c.dag_id, subq1.c.task_id,
subq1.c.execution_date).subquery('subq2')
+ # This query might deadlock occasionally and it should be retried if
fails (see decorator)
+ session.query(cls).filter(
+ cls.dag_id == dag_id,
+ cls.task_id == task_id,
+ tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2),
+ ).delete(synchronize_session=False)
+ session.flush()