ashb commented on a change in pull request #18616:
URL: https://github.com/apache/airflow/pull/18616#discussion_r719206339



##########
File path: airflow/models/renderedtifields.py
##########
@@ -191,3 +179,20 @@ def delete_old_records(
             ]
 
             
session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)
+
+    @classmethod
+    @retry_db_transaction
+    def remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, 
tis_to_keep_query):

Review comment:
       ```suggestion
       def _remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, 
tis_to_keep_query):
   ```

##########
File path: airflow/models/renderedtifields.py
##########
@@ -161,20 +162,7 @@ def delete_old_records(
                 tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq1),
             ).delete(synchronize_session=False)
         elif session.bind.dialect.name in ["mysql"]:
-            # Fetch Top X records given dag_id & task_id ordered by Execution 
Date
-            subq1 = tis_to_keep_query.subquery('subq1')
-
-            # Second Subquery
-            # Workaround for MySQL Limitation 
(https://stackoverflow.com/a/19344141/5691525)
-            # Limitation: This version of MySQL does not yet support
-            # LIMIT & IN/ALL/ANY/SOME subquery
-            subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, 
subq1.c.execution_date).subquery('subq2')
-
-            session.query(cls).filter(
-                cls.dag_id == dag_id,
-                cls.task_id == task_id,
-                tuple_(cls.dag_id, cls.task_id, 
cls.execution_date).notin_(subq2),
-            ).delete(synchronize_session=False)
+            cls.remove_old_rendered_ti_fields_mysql(dag_id, session, task_id, 
tis_to_keep_query)

Review comment:
       ```suggestion
               cls._remove_old_rendered_ti_fields_mysql(dag_id, session, 
task_id, tis_to_keep_query)
   ```

##########
File path: airflow/models/renderedtifields.py
##########
@@ -191,3 +179,20 @@ def delete_old_records(
             ]
 
             
session.query(cls).filter(and_(*filter_tis)).delete(synchronize_session=False)
+
+    @classmethod
+    @retry_db_transaction
+    def remove_old_rendered_ti_fields_mysql(cls, dag_id, session, task_id, 
tis_to_keep_query):
+        # Fetch Top X records given dag_id & task_id ordered by Execution Date
+        subq1 = tis_to_keep_query.subquery('subq1')
+        # Second Subquery
+        # Workaround for MySQL Limitation 
(https://stackoverflow.com/a/19344141/5691525)
+        # Limitation: This version of MySQL does not yet support
+        # LIMIT & IN/ALL/ANY/SOME subquery
+        subq2 = session.query(subq1.c.dag_id, subq1.c.task_id, 
subq1.c.execution_date).subquery('subq2')
+        # This query might deadlock occasionally and it should be retried if 
fails (see decorator)
+        session.query(cls).filter(
+            cls.dag_id == dag_id,
+            cls.task_id == task_id,
+            tuple_(cls.dag_id, cls.task_id, cls.execution_date).notin_(subq2),
+        ).delete(synchronize_session=False)

Review comment:
       I think this should have
   ```suggestion
           ).delete(synchronize_session=False)
           session.flush()
   ```
   
   Otherwise it _might_ not make it to the DB but be pending in the SQLA 
unit-of-work. (Not sure in this case, maybe `query.delete()` might issue the 
query directly)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to