This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new db2d73d  Retry deadlocked transactions on deleting old rendered task 
fields (#18616)
db2d73d is described below

commit db2d73d95e793e63e152692f216deec9b9d9bc85
Author: Jarek Potiuk <[email protected]>
AuthorDate: Thu Sep 30 18:50:52 2021 +0200

    Retry deadlocked transactions on deleting old rendered task fields (#18616)
    
    
    
    The query that deletes rendered old rendered task fields for MySQL
    can occasionally deadlock because it is unncesssary complex with
    a subquery (due to features missing in MySQL). This change
    adds DB retries to get rid of the deadlock (as is the
    recommended practice for MySQL).
    
    Fixes: #18512
    
    * Update airflow/models/renderedtifields.py
    
    Co-authored-by: Kaxil Naik <[email protected]>
    
    * Apply suggestions from code review
    
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
    
    Co-authored-by: Kaxil Naik <[email protected]>
    Co-authored-by: Ash Berlin-Taylor <[email protected]>
---
 airflow/models/renderedtifields.py | 34 ++++++++++++++++++++--------------
 1 file changed, 20 insertions(+), 14 deletions(-)

diff --git a/airflow/models/renderedtifields.py 
b/airflow/models/renderedtifields.py
index 0572c89..23b2b78 100644
--- a/airflow/models/renderedtifields.py
+++ b/airflow/models/renderedtifields.py
@@ -28,6 +28,7 @@ from airflow.models.base import ID_LEN, Base
 from airflow.models.taskinstance import TaskInstance
 from airflow.serialization.helpers import serialize_template_field
 from airflow.settings import json
+from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import provide_session
 from airflow.utils.sqlalchemy import UtcDateTime
 
@@ -161,20 +162,7 @@ class RenderedTaskInstanceFields(Base):
                 tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq1),
             ).delete(synchronize_session=False)
         elif session.bind.dialect.name in ["mysql"]:
-            # Fetch Top X records given dag_id & task_id ordered by Execution 
Date
-            subq1 = tis_to_keep_query.subquery('subq1')
-
-            # Second Subquery
-            # Workaround for MySQL Limitation 
(https://stackoverflow.com/a/19344141/5691525)
-            # Limitation: This version of MySQL does not yet support
-            # LIMIT & IN/ALL/ANY/SOME subquery
-            subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, 
subq1.c.execution_date).subquery('subq2')
-
-            session.query(cls).filter(
-                cls.dag_id == dag_id,
-                cls.task_id == task_id,
-                tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq2),
-            ).delete(synchronize_session=False)
+            cls._remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, 
tis_to_keep_query)
         else:
             # Fetch Top X records given dag_id & task_id ordered by Execution 
Date
             tis_to_keep = tis_to_keep_query.all()
@@ -191,3 +179,21 @@ class RenderedTaskInstanceFields(Base):
             ]
 
             
session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)
+
+    @classmethod
+    @retry_db_transaction
+    def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, 
tis_to_keep_query):
+        # Fetch Top X records given dag_id & task_id ordered by Execution Date
+        subq1 = tis_to_keep_query.subquery('subq1')
+        # Second Subquery
+        # Workaround for MySQL Limitation 
(https://stackoverflow.com/a/19344141/5691525)
+        # Limitation: This version of MySQL does not yet support
+        # LIMIT & IN/ALL/ANY/SOME subquery
+        subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, 
subq1.c.execution_date).subquery('subq2')
+        # This query might deadlock occasionally and it should be retried if 
fails (see decorator)
+        session.query(cls).filter(
+            cls.dag_id == dag_id,
+            cls.task_id == task_id,
+            tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2),
+        ).delete(synchronize_session=False)
+        session.flush()

Reply via email to