mik-laj commented on a change in pull request #8051: [AIRFLOW-7045] Update SQL 
query to delete RenderedTaskInstanceFields
URL: https://github.com/apache/airflow/pull/8051#discussion_r402020441
 
 

 ##########
 File path: airflow/models/renderedtifields.py
 ##########
 @@ -110,28 +110,20 @@ def delete_old_records(
             return
 
         # Fetch Top X records given dag_id & task_id ordered by Execution Date
-        subq1 = (
+        tis_to_keep = (
             session
             .query(cls.dag_id, cls.task_id, cls.execution_date)
             .filter(cls.dag_id == dag_id, cls.task_id == task_id)
             .order_by(cls.execution_date.desc())
             .limit(num_to_keep)
-            .subquery('subq1')
-        )
-
-        # Second Subquery
-        # Workaround for MySQL Limitation 
(https://stackoverflow.com/a/19344141/5691525)
-        # Limitation: This version of MySQL does not yet support
-        # LIMIT & IN/ALL/ANY/SOME subquery
-        subq2 = (
-            session
-            .query(subq1.c.dag_id, subq1.c.task_id, subq1.c.execution_date)
-            .subquery('subq2')
-        )
+        ).all()
+
+        filter_tis = [not_(and_(
+            cls.dag_id == ti.dag_id,
+            cls.task_id == ti.task_id,
+            cls.execution_date == ti.execution_date
+        )) for ti in tis_to_keep]
 
 Review comment:
   ```suggestion
           filter_tis = [not_(TI.filter_for_tis(tis_to_keep)]
   ```
   This may require a change in the filter_for_tis method. It will look 
something like the code below.
   ```python
       @staticmethod
       def filter_for_tis(
           tis: Iterable[Union["TaskInstance", TaskInstanceKeyType]]
       ) -> Optional[BooleanClauseList]:
           """Returns SQLAlchemy filter to query selected task instances"""
           TI = TaskInstance
           if not tis:
               return None
           if all(isinstance(t, tuple) for t in tis):
               if all(len(t) == 4 for t in tis):
                   filter_for_tis = [
                       and_(TI.dag_id == dag_id, TI.task_id == task_id, 
TI.execution_date == execution_date)
                       for dag_id, task_id, execution_date, _ in tis
                   ]
                   return or_(*filter_for_tis)
               if all(len(t) == 3 for t in tis):
                   filter_for_tis = [
                       and_(TI.dag_id == dag_id, TI.task_id == task_id, 
TI.execution_date == execution_date)
                       for dag_id, task_id, execution_date in tis
                   ]
                   return or_(*filter_for_tis)
               raise TypeError(
                   "All elements must have the same type: `TaskInstance`, 
`TaskInstanceKey` or "
                   "tuple[dag_id, task_id, execution_date]."
               )
   
           if all(isinstance(t, TaskInstance) for t in tis):
               filter_for_tis = ([and_(TI.dag_id == ti.dag_id,  # type: ignore
                                       TI.task_id == ti.task_id,  # type: ignore
                                       TI.execution_date == ti.execution_date)  
# type: ignore
                                  for ti in tis])
               return or_(*filter_for_tis)
   
           raise TypeError(
               "All elements must have the same type: `TaskInstance`, 
`TaskInstanceKey` or "
               "tuple[dag_id, task_id, execution_date]."
           )
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to