yuqian90 commented on a change in pull request #15382:
URL: https://github.com/apache/airflow/pull/15382#discussion_r614626032
##########
File path: airflow/models/taskinstance.py
##########
@@ -205,19 +206,25 @@ def clear_task_instances(
for job in
session.query(BaseJob).filter(BaseJob.id.in_(job_ids)).all(): # noqa
job.state = State.SHUTDOWN
- if activate_dag_runs and tis:
+ if (dag_run_state is not False) and tis:
from airflow.models.dagrun import DagRun # Avoid circular import
+ dates_by_dag_id = defaultdict(set)
+ for instance in tis:
+ dates_by_dag_id[instance.dag_id].add(instance.execution_date)
+
drs = (
session.query(DagRun)
.filter(
- DagRun.dag_id.in_({ti.dag_id for ti in tis}),
- DagRun.execution_date.in_({ti.execution_date for ti in tis}),
+ or_(
+ and_(DagRun.dag_id == dag_id,
DagRun.execution_date.in_(dates))
Review comment:
This filter is fixed. Previously it assumes execution_dates of all the
DagRun are the same, which is not always true. The fix is to query by dag_id
and execution_dates in a hierarchical manner.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]