This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 37c43ddd997 Fix backfill marked complete before DagRuns are created
(#62561)
37c43ddd997 is described below
commit 37c43ddd99754b9e512a93ad0b35ced548b260c9
Author: Shivam Rastogi <[email protected]>
AuthorDate: Thu Apr 2 02:15:32 2026 -0700
Fix backfill marked complete before DagRuns are created (#62561)
---
airflow-core/newsfragments/62561.bugfix.rst | 1 +
.../src/airflow/jobs/scheduler_job_runner.py | 11 +-
airflow-core/tests/unit/jobs/test_scheduler_job.py | 259 ++++++++++++++++++++-
3 files changed, 269 insertions(+), 2 deletions(-)
diff --git a/airflow-core/newsfragments/62561.bugfix.rst
b/airflow-core/newsfragments/62561.bugfix.rst
new file mode 100644
index 00000000000..9b40b89e211
--- /dev/null
+++ b/airflow-core/newsfragments/62561.bugfix.rst
@@ -0,0 +1 @@
+Fix backfill marked complete before DagRuns are created; add age-based cleanup
for orphaned backfills.
diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
index bda95bbf4b0..67006a7d8ee 100644
--- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py
+++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py
@@ -71,7 +71,7 @@ from airflow.models.asset import (
TaskInletAssetReference,
TaskOutletAssetReference,
)
-from airflow.models.backfill import Backfill
+from airflow.models.backfill import Backfill, BackfillDagRun
from airflow.models.callback import Callback, CallbackType, ExecutorCallback
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
@@ -1861,8 +1861,17 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
unfinished_states = (DagRunState.RUNNING, DagRunState.QUEUED)
now = timezone.utcnow()
# todo: AIP-78 simplify this function to an update statement
+ initializing_cutoff = now - timedelta(minutes=2)
query = select(Backfill).where(
Backfill.completed_at.is_(None),
+ # Guard: backfill must have at least one association,
+ # otherwise it is still being set up (see #61375).
+ # Allow cleanup of orphaned backfills older than 2 minutes
+ # that failed during initialization and never got any associations.
+ or_(
+
exists(select(BackfillDagRun.id).where(BackfillDagRun.backfill_id ==
Backfill.id)),
+ Backfill.created_at < initializing_cutoff,
+ ),
~exists(
select(DagRun.id).where(
and_(DagRun.backfill_id == Backfill.id,
DagRun.state.in_(unfinished_states))
diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py
b/airflow-core/tests/unit/jobs/test_scheduler_job.py
index 76183c7fd73..5ccbb7e4056 100644
--- a/airflow-core/tests/unit/jobs/test_scheduler_job.py
+++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py
@@ -65,7 +65,7 @@ from airflow.models.asset import (
AssetPartitionDagRun,
PartitionedAssetKeyLog,
)
-from airflow.models.backfill import Backfill, _create_backfill
+from airflow.models.backfill import Backfill, BackfillDagRun,
ReprocessBehavior, _create_backfill
from airflow.models.callback import ExecutorCallback
from airflow.models.dag import DagModel, get_last_dagrun,
infer_automated_data_interval
from airflow.models.dag_version import DagVersion
@@ -8792,6 +8792,263 @@ def test_mark_backfills_completed(dag_maker, session):
assert b.completed_at.timestamp() > 0
+def test_mark_backfills_complete_skips_initializing_backfill(dag_maker,
session):
+ clear_db_backfills()
+ dag_id = "test_backfill_race_lifecycle"
+ with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+ BashOperator(task_id="hi", bash_command="echo hi")
+ b = Backfill(
+ dag_id=dag_id,
+ from_date=pendulum.parse("2021-01-01"),
+ to_date=pendulum.parse("2021-01-03"),
+ max_active_runs=10,
+ dag_run_conf={},
+ reprocess_behavior=ReprocessBehavior.NONE,
+ )
+ session.add(b)
+ session.commit()
+ backfill_id = b.id
+ session.expunge_all()
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ runner._mark_backfills_complete()
+ b = session.get(Backfill, backfill_id)
+ assert b.completed_at is None
+ session.expunge_all()
+ dr = DagRun(
+ dag_id=dag_id,
+ run_id="backfill__2021-01-01T00:00:00+00:00",
+ run_type=DagRunType.BACKFILL_JOB,
+ logical_date=pendulum.parse("2021-01-01"),
+ data_interval=(pendulum.parse("2021-01-01"),
pendulum.parse("2021-01-02")),
+ run_after=pendulum.parse("2021-01-02"),
+ state=DagRunState.SUCCESS,
+ backfill_id=backfill_id,
+ )
+ session.add(dr)
+ session.flush()
+ session.add(
+ BackfillDagRun(
+ backfill_id=backfill_id,
+ dag_run_id=dr.id,
+ logical_date=pendulum.parse("2021-01-01"),
+ sort_ordinal=1,
+ )
+ )
+ session.commit()
+ session.expunge_all()
+ runner._mark_backfills_complete()
+ b = session.get(Backfill, backfill_id)
+ assert b.completed_at is not None
+
+
+def test_mark_backfills_complete_cleans_orphan_after_cutoff(dag_maker,
session):
+ """Backfill with no BackfillDagRun rows older than 2 minutes should be
auto-completed."""
+ clear_db_backfills()
+ dag_id = "test_backfill_orphan_cleanup"
+ with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+ BashOperator(task_id="hi", bash_command="echo hi")
+ b = Backfill(
+ dag_id=dag_id,
+ from_date=pendulum.parse("2021-01-01"),
+ to_date=pendulum.parse("2021-01-03"),
+ max_active_runs=10,
+ dag_run_conf={},
+ reprocess_behavior=ReprocessBehavior.NONE,
+ )
+ session.add(b)
+ session.commit()
+ backfill_id = b.id
+ session.expunge_all()
+ # Travel 3 minutes into the future so the backfill is past the 2-minute
cutoff
+ with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False):
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ runner._mark_backfills_complete()
+ b = session.get(Backfill, backfill_id)
+ assert b.completed_at is not None
+
+
+def
test_mark_backfills_complete_keeps_old_backfill_with_running_dagruns(dag_maker,
session):
+ """Old backfill (>2 min) with running DagRuns must NOT be marked
complete."""
+ clear_db_backfills()
+ dag_id = "test_backfill_old_with_runs"
+ with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+ BashOperator(task_id="hi", bash_command="echo hi")
+ b = Backfill(
+ dag_id=dag_id,
+ from_date=pendulum.parse("2021-01-01"),
+ to_date=pendulum.parse("2021-01-03"),
+ max_active_runs=10,
+ dag_run_conf={},
+ reprocess_behavior=ReprocessBehavior.NONE,
+ )
+ session.add(b)
+ session.commit()
+ backfill_id = b.id
+ dr = DagRun(
+ dag_id=dag_id,
+ run_id="backfill__2021-01-01T00:00:00+00:00",
+ run_type=DagRunType.BACKFILL_JOB,
+ logical_date=pendulum.parse("2021-01-01"),
+ data_interval=(pendulum.parse("2021-01-01"),
pendulum.parse("2021-01-02")),
+ run_after=pendulum.parse("2021-01-02"),
+ state=DagRunState.RUNNING,
+ backfill_id=backfill_id,
+ )
+ session.add(dr)
+ session.flush()
+ session.add(
+ BackfillDagRun(
+ backfill_id=backfill_id,
+ dag_run_id=dr.id,
+ logical_date=pendulum.parse("2021-01-01"),
+ sort_ordinal=1,
+ )
+ )
+ session.commit()
+ session.expunge_all()
+ # Travel 3 minutes into the future; backfill is old but has a RUNNING
DagRun
+ with time_machine.travel(pendulum.now("UTC").add(minutes=3), tick=False):
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ runner._mark_backfills_complete()
+ b = session.get(Backfill, backfill_id)
+ assert b.completed_at is None
+
+
+def test_mark_backfills_complete_young_backfill_with_finished_runs(dag_maker,
session):
+ """Young backfill (<2 min) with all SUCCESS DagRuns completes
immediately."""
+ clear_db_backfills()
+ dag_id = "test_backfill_young_finished"
+ with dag_maker(serialized=True, dag_id=dag_id, schedule="@daily"):
+ BashOperator(task_id="hi", bash_command="echo hi")
+ b = Backfill(
+ dag_id=dag_id,
+ from_date=pendulum.parse("2021-01-01"),
+ to_date=pendulum.parse("2021-01-03"),
+ max_active_runs=10,
+ dag_run_conf={},
+ reprocess_behavior=ReprocessBehavior.NONE,
+ )
+ session.add(b)
+ session.commit()
+ backfill_id = b.id
+ dr = DagRun(
+ dag_id=dag_id,
+ run_id="backfill__2021-01-01T00:00:00+00:00",
+ run_type=DagRunType.BACKFILL_JOB,
+ logical_date=pendulum.parse("2021-01-01"),
+ data_interval=(pendulum.parse("2021-01-01"),
pendulum.parse("2021-01-02")),
+ run_after=pendulum.parse("2021-01-02"),
+ state=DagRunState.SUCCESS,
+ backfill_id=backfill_id,
+ )
+ session.add(dr)
+ session.flush()
+ session.add(
+ BackfillDagRun(
+ backfill_id=backfill_id,
+ dag_run_id=dr.id,
+ logical_date=pendulum.parse("2021-01-01"),
+ sort_ordinal=1,
+ )
+ )
+ session.commit()
+ session.expunge_all()
+ # No time travel — backfill was just created, should still complete
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ runner._mark_backfills_complete()
+ b = session.get(Backfill, backfill_id)
+ assert b.completed_at is not None
+
+
+def test_mark_backfills_complete_multiple_independent(dag_maker, session):
+ """Two backfills: one finished, one running — only the finished one
completes."""
+ clear_db_backfills()
+ with dag_maker(serialized=True, dag_id="dag_finished", schedule="@daily"):
+ BashOperator(task_id="hi", bash_command="echo hi")
+ with dag_maker(serialized=True, dag_id="dag_running", schedule="@daily"):
+ BashOperator(task_id="hi", bash_command="echo hi")
+ b_finished = Backfill(
+ dag_id="dag_finished",
+ from_date=pendulum.parse("2021-01-01"),
+ to_date=pendulum.parse("2021-01-03"),
+ max_active_runs=10,
+ dag_run_conf={},
+ reprocess_behavior=ReprocessBehavior.NONE,
+ )
+ b_running = Backfill(
+ dag_id="dag_running",
+ from_date=pendulum.parse("2021-01-01"),
+ to_date=pendulum.parse("2021-01-03"),
+ max_active_runs=10,
+ dag_run_conf={},
+ reprocess_behavior=ReprocessBehavior.NONE,
+ )
+ session.add_all([b_finished, b_running])
+ session.commit()
+ finished_id = b_finished.id
+ running_id = b_running.id
+ # Finished backfill: SUCCESS DagRun
+ dr1 = DagRun(
+ dag_id="dag_finished",
+ run_id="backfill__2021-01-01T00:00:00+00:00",
+ run_type=DagRunType.BACKFILL_JOB,
+ logical_date=pendulum.parse("2021-01-01"),
+ data_interval=(pendulum.parse("2021-01-01"),
pendulum.parse("2021-01-02")),
+ run_after=pendulum.parse("2021-01-02"),
+ state=DagRunState.SUCCESS,
+ backfill_id=finished_id,
+ )
+ session.add(dr1)
+ session.flush()
+ session.add(
+ BackfillDagRun(
+ backfill_id=finished_id,
+ dag_run_id=dr1.id,
+ logical_date=pendulum.parse("2021-01-01"),
+ sort_ordinal=1,
+ )
+ )
+ # Running backfill: RUNNING DagRun
+ dr2 = DagRun(
+ dag_id="dag_running",
+ run_id="backfill__2021-01-01T00:00:00+00:00",
+ run_type=DagRunType.BACKFILL_JOB,
+ logical_date=pendulum.parse("2021-01-01"),
+ data_interval=(pendulum.parse("2021-01-01"),
pendulum.parse("2021-01-02")),
+ run_after=pendulum.parse("2021-01-02"),
+ state=DagRunState.RUNNING,
+ backfill_id=running_id,
+ )
+ session.add(dr2)
+ session.flush()
+ session.add(
+ BackfillDagRun(
+ backfill_id=running_id,
+ dag_run_id=dr2.id,
+ logical_date=pendulum.parse("2021-01-01"),
+ sort_ordinal=1,
+ )
+ )
+ session.commit()
+ session.expunge_all()
+ runner = SchedulerJobRunner(
+ job=Job(job_type=SchedulerJobRunner.job_type),
executors=[MockExecutor(do_update=False)]
+ )
+ runner._mark_backfills_complete()
+ b_finished = session.get(Backfill, finished_id)
+ b_running = session.get(Backfill, running_id)
+ assert b_finished.completed_at is not None
+ assert b_running.completed_at is None
+
+
class Key1Mapper(CorePartitionMapper):
"""Partition Mapper that returns only key-1 as downstream key"""