uranusjr commented on a change in pull request #17643:
URL: https://github.com/apache/airflow/pull/17643#discussion_r690069059
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,6 +289,45 @@ def __get_concurrency_maps(
task_map[(dag_id, task_id)] = count
return dag_map, task_map
+ @provide_session
+ def _remove_finished_tis_without_dagrun(self, session: Session = None) ->
None:
+ """
+ Remove all TaskInstances for which the DagRun is missing but the task
instances are in
+ finished state. Possibly the dag went missing and the task instances
were failed.
+ """
+ TI = models.TaskInstance
+ query = (
+ session.query(TI)
+ .outerjoin(TI.dag_run)
+ .filter(TI.state.in_(State.finished))
+ .filter(DR.run_id.is_(None))
+ )
+
+ try:
+ count = query.count()
+ if count == 0:
+ return
+ except Exception:
+ # We want to catch any error and log it.
+ # This is to avoid the task-runner from failing.
+ self.log.error(
+ "Could not remove failed tasks for dag_id: %s",
+ ", ".join({str(ti.dag_id) for ti in query}),
+ )
+ return
+
+ query = with_row_locks(query, of=TI, session=session,
**skip_locked(session=session)).all()
+ dag_ids = {ti.dag_id for ti in query}
+ if len(query) > 0:
+ session.query(TI).filter(TI.task_id.in_([ti.task_id for ti in
query])).delete(
+ synchronize_session=False
+ )
Review comment:
I’d just run `delete()`; if the query is empty, the deletion call would
just be no-op anyway (and likely not much slower than the additional query
implied by the `len()` call).
##########
File path: airflow/jobs/scheduler_job.py
##########
@@ -289,6 +289,45 @@ def __get_concurrency_maps(
task_map[(dag_id, task_id)] = count
return dag_map, task_map
+ @provide_session
+ def _remove_finished_tis_without_dagrun(self, session: Session = None) ->
None:
+ """
+ Remove all TaskInstances for which the DagRun is missing but the task
instances are in
+ finished state. Possibly the dag went missing and the task instances
were failed.
+ """
+ TI = models.TaskInstance
+ query = (
+ session.query(TI)
+ .outerjoin(TI.dag_run)
+ .filter(TI.state.in_(State.finished))
+ .filter(DR.run_id.is_(None))
+ )
+
+ try:
+ count = query.count()
+ if count == 0:
+ return
+ except Exception:
+ # We want to catch any error and log it.
+ # This is to avoid the task-runner from failing.
+ self.log.error(
+ "Could not remove failed tasks for dag_id: %s",
+ ", ".join({str(ti.dag_id) for ti in query}),
+ )
+ return
+
+ query = with_row_locks(query, of=TI, session=session,
**skip_locked(session=session)).all()
+ dag_ids = {ti.dag_id for ti in query}
+ if len(query) > 0:
+ session.query(TI).filter(TI.task_id.in_([ti.task_id for ti in
query])).delete(
+ synchronize_session=False
+ )
+ self.log.warning(
+ "Deleted %s task instances in finished state without DagRun. "
"Affected DAGs: %s",
+ count,
+ dag_ids,
+ )
Review comment:
Black messed up the string here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]