This is an automated email from the ASF dual-hosted git repository.
dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 72da8f6b60 Add logic to mark backfills as complete (#42683)
72da8f6b60 is described below
commit 72da8f6b6082ad78faf8706705bad3d01d13b2fc
Author: Daniel Standish <[email protected]>
AuthorDate: Sun Oct 13 19:40:48 2024 -0700
Add logic to mark backfills as complete (#42683)
Periodically check for backfills that should be marked as complete.
---
airflow/jobs/scheduler_job_runner.py | 30 +++++++++++++++++++++++++++++-
tests/jobs/test_scheduler_job.py | 29 +++++++++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/airflow/jobs/scheduler_job_runner.py
b/airflow/jobs/scheduler_job_runner.py
index 2999ed391b..f6821f57aa 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -30,7 +30,7 @@ from functools import lru_cache, partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
-from sqlalchemy import and_, delete, func, not_, or_, select, text, update
+from sqlalchemy import and_, delete, exists, func, not_, or_, select, text,
update
from sqlalchemy.exc import OperationalError
from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
from sqlalchemy.sql import expression
@@ -51,6 +51,7 @@ from airflow.models.asset import (
DagScheduleAssetReference,
TaskOutletAssetReference,
)
+from airflow.models.backfill import Backfill
from airflow.models.dag import DAG, DagModel
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
@@ -1063,6 +1064,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
self.check_trigger_timeouts,
)
+ timers.call_regular_interval(
+ 30,
+ self._mark_backfills_complete,
+ )
+
timers.call_regular_interval(
conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0),
self._emit_pool_metrics,
@@ -1288,6 +1294,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
guard.commit()
# END: create dagruns
+ @provide_session
+ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
+ """Mark completed backfills as completed."""
+ self.log.debug("checking for completed backfills.")
+ unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED)
+ now = timezone.utcnow()
+ # todo: AIP-78 simplify this function to an update statement
+ query = select(Backfill).where(
+ Backfill.completed_at.is_(None),
+ ~exists(
+ select(DagRun.id).where(
+ and_(DagRun.backfill_id == Backfill.id,
DagRun.state.in_(unfinished_states))
+ )
+ ),
+ )
+ backfills = session.scalars(query).all()
+ if not backfills:
+ return
+ self.log.info("marking %s backfills as complete", len(backfills))
+ for b in backfills:
+ b.completed_at = now
+
@add_span
def _create_dag_runs(self, dag_models: Collection[DagModel], session:
Session) -> None:
"""Create a DAG run and update the dag_model to control if/when the
next DAGRun should be created."""
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 18ca7c6332..7eae1639d0 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -6399,3 +6399,32 @@ class TestSchedulerJobQueriesCount:
prefix = "Collected database query count mismatches:"
joined = "\n\n".join(failures)
raise AssertionError(f"{prefix}\n\n{joined}")
+
+
+def test_mark_backfills_completed(dag_maker, session):
+ clear_db_backfills()
+ with dag_maker(serialized=True, dag_id="test_mark_backfills_completed",
schedule="@daily") as dag:
+ BashOperator(task_id="hi", bash_command="echo hi")
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2021-01-01"),
+ to_date=pendulum.parse("2021-01-03"),
+ max_active_runs=10,
+ reverse=False,
+ dag_run_conf={},
+ )
+ session.expunge_all()
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type,
executor=MockExecutor(do_update=False))
+ )
+ runner._mark_backfills_complete()
+ b = session.get(Backfill, b.id)
+ assert b.completed_at is None
+ session.expunge_all()
+ drs = session.scalars(select(DagRun).where(DagRun.dag_id == dag.dag_id))
+ for dr in drs:
+ dr.state = DagRunState.SUCCESS
+ session.commit()
+ runner._mark_backfills_complete()
+ b = session.get(Backfill, b.id)
+ assert b.completed_at.timestamp() > 0