This is an automated email from the ASF dual-hosted git repository.

dstandish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 72da8f6b60 Add logic to mark backfills as complete (#42683)
72da8f6b60 is described below

commit 72da8f6b6082ad78faf8706705bad3d01d13b2fc
Author: Daniel Standish <[email protected]>
AuthorDate: Sun Oct 13 19:40:48 2024 -0700

    Add logic to mark backfills as complete (#42683)
    
    Periodically check for backfills that should be marked as complete.
---
 airflow/jobs/scheduler_job_runner.py | 30 +++++++++++++++++++++++++++++-
 tests/jobs/test_scheduler_job.py     | 29 +++++++++++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 2999ed391b..f6821f57aa 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -30,7 +30,7 @@ from functools import lru_cache, partial
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, Collection, Iterable, Iterator
 
-from sqlalchemy import and_, delete, func, not_, or_, select, text, update
+from sqlalchemy import and_, delete, exists, func, not_, or_, select, text, 
update
 from sqlalchemy.exc import OperationalError
 from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
 from sqlalchemy.sql import expression
@@ -51,6 +51,7 @@ from airflow.models.asset import (
     DagScheduleAssetReference,
     TaskOutletAssetReference,
 )
+from airflow.models.backfill import Backfill
 from airflow.models.dag import DAG, DagModel
 from airflow.models.dagbag import DagBag
 from airflow.models.dagrun import DagRun
@@ -1063,6 +1064,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             self.check_trigger_timeouts,
         )
 
+        timers.call_regular_interval(
+            30,
+            self._mark_backfills_complete,
+        )
+
         timers.call_regular_interval(
             conf.getfloat("scheduler", "pool_metrics_interval", fallback=5.0),
             self._emit_pool_metrics,
@@ -1288,6 +1294,28 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
         guard.commit()
         # END: create dagruns
 
+    @provide_session
+    def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None:
+        """Mark completed backfills as completed."""
+        self.log.debug("checking for completed backfills.")
+        unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED)
+        now = timezone.utcnow()
+        # todo: AIP-78 simplify this function to an update statement
+        query = select(Backfill).where(
+            Backfill.completed_at.is_(None),
+            ~exists(
+                select(DagRun.id).where(
+                    and_(DagRun.backfill_id == Backfill.id, 
DagRun.state.in_(unfinished_states))
+                )
+            ),
+        )
+        backfills = session.scalars(query).all()
+        if not backfills:
+            return
+        self.log.info("marking %s backfills as complete", len(backfills))
+        for b in backfills:
+            b.completed_at = now
+
     @add_span
     def _create_dag_runs(self, dag_models: Collection[DagModel], session: 
Session) -> None:
         """Create a DAG run and update the dag_model to control if/when the 
next DAGRun should be created."""
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 18ca7c6332..7eae1639d0 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -6399,3 +6399,32 @@ class TestSchedulerJobQueriesCount:
                 prefix = "Collected database query count mismatches:"
                 joined = "\n\n".join(failures)
                 raise AssertionError(f"{prefix}\n\n{joined}")
+
+
+def test_mark_backfills_completed(dag_maker, session):
+    clear_db_backfills()
+    with dag_maker(serialized=True, dag_id="test_mark_backfills_completed", 
schedule="@daily") as dag:
+        BashOperator(task_id="hi", bash_command="echo hi")
+    b = _create_backfill(
+        dag_id=dag.dag_id,
+        from_date=pendulum.parse("2021-01-01"),
+        to_date=pendulum.parse("2021-01-03"),
+        max_active_runs=10,
+        reverse=False,
+        dag_run_conf={},
+    )
+    session.expunge_all()
+    runner = SchedulerJobRunner(
+        job=Job(job_type=SchedulerJobRunner.job_type, 
executor=MockExecutor(do_update=False))
+    )
+    runner._mark_backfills_complete()
+    b = session.get(Backfill, b.id)
+    assert b.completed_at is None
+    session.expunge_all()
+    drs = session.scalars(select(DagRun).where(DagRun.dag_id == dag.dag_id))
+    for dr in drs:
+        dr.state = DagRunState.SUCCESS
+    session.commit()
+    runner._mark_backfills_complete()
+    b = session.get(Backfill, b.id)
+    assert b.completed_at.timestamp() > 0

Reply via email to