mik-laj commented on a change in pull request #8051: [AIRFLOW-7045] Update SQL
query to delete RenderedTaskInstanceFields
URL: https://github.com/apache/airflow/pull/8051#discussion_r402020441
##########
File path: airflow/models/renderedtifields.py
##########
@@ -110,28 +110,20 @@ def delete_old_records(
return
# Fetch Top X records given dag_id & task_id ordered by Execution Date
- subq1 = (
+ tis_to_keep = (
session
.query(cls.dag_id, cls.task_id, cls.execution_date)
.filter(cls.dag_id == dag_id, cls.task_id == task_id)
.order_by(cls.execution_date.desc())
.limit(num_to_keep)
- .subquery('subq1')
- )
-
- # Second Subquery
- # Workaround for MySQL Limitation
(https://stackoverflow.com/a/19344141/5691525)
- # Limitation: This version of MySQL does not yet support
- # LIMIT & IN/ALL/ANY/SOME subquery
- subq2 = (
- session
- .query(subq1.c.dag_id, subq1.c.task_id, subq1.c.execution_date)
- .subquery('subq2')
- )
+ ).all()
+
+ filter_tis = [not_(and_(
+ cls.dag_id == ti.dag_id,
+ cls.task_id == ti.task_id,
+ cls.execution_date == ti.execution_date
+ )) for ti in tis_to_keep]
Review comment:
```suggestion
filter_tis = [not_(TI.filter_for_tis(tis_to_keep)]
```
This may require a change in the filter_for_tis method. It will look
something like the code below.
```python
@staticmethod
def filter_for_tis(
tis: Iterable[Union["TaskInstance", TaskInstanceKeyType]]
) -> Optional[BooleanClauseList]:
"""Returns SQLAlchemy filter to query selected task instances"""
TI = TaskInstance
if not tis:
return None
if all(isinstance(t, tuple) for t in tis):
if all(len(t) == 4 for t in tis):
filter_for_tis = [
and_(TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date == execution_date)
for dag_id, task_id, execution_date, _ in tis
]
return or_(*filter_for_tis)
if all(len(t) == 3 for t in tis):
filter_for_tis = [
and_(TI.dag_id == dag_id, TI.task_id == task_id,
TI.execution_date == execution_date)
for dag_id, task_id, execution_date in tis
]
return or_(*filter_for_tis)
raise TypeError(
"All elements must have the same type: `TaskInstance`,
`TaskInstanceKey` or "
"tuple[dag_id, task_id, execution_date]."
)
if all(isinstance(t, TaskInstance) for t in tis):
filter_for_tis = ([and_(TI.dag_id == ti.dag_id, # type: ignore
TI.task_id == ti.task_id, # type: ignore
TI.execution_date == ti.execution_date)
# type: ignore
for ti in tis])
return or_(*filter_for_tis)
raise TypeError(
"All elements must have the same type: `TaskInstance`,
`TaskInstanceKey` or "
"tuple[dag_id, task_id, execution_date]."
)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services