This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 58ba6620616 Fix misleading comment and pin reprocess behaviour for
partitioned backfill (#68618)
58ba6620616 is described below
commit 58ba6620616e20ab96b02af937d0f7d5f9ef4c7f
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 17 13:39:58 2026 +0800
Fix misleading comment and pin reprocess behaviour for partitioned backfill
(#68618)
---
airflow-core/src/airflow/models/backfill.py | 9 ++--
airflow-core/tests/unit/models/test_backfill.py | 60 +++++++++++++++++++++++++
2 files changed, 65 insertions(+), 4 deletions(-)
diff --git a/airflow-core/src/airflow/models/backfill.py
b/airflow-core/src/airflow/models/backfill.py
index 1ecee7f27a3..f27f236e171 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -484,10 +484,6 @@ def _create_backfill_dag_run_partitioned(
triggering_user_name: str | None,
session: Session,
) -> None:
- # Partitioned backfills don't currently reprocess existing runs — if a run
exists
- # for this partition, it's recorded as skipped via exception_reason rather
than
- # cleared and re-queued. As a result, this function never calls
``_handle_clear_run``
- # and therefore doesn't need to forward ``dag_run_conf`` for the reprocess
path.
stmt = _get_latest_dag_run_row_query(dag_id=dag.dag_id, info=info)
dr = session.scalar(stmt)
if dr:
@@ -507,6 +503,11 @@ def _create_backfill_dag_run_partitioned(
"Skipping dag run creation.",
non_create_reason=non_create_reason, backfill_id=backfill_id
)
return
+
+ # reprocess_behavior allows a retry: create a new run alongside the
existing
+ # one rather than clearing it. The prior run is kept as a historical
record;
+ # _get_latest_dag_run_row_query picks the newest run by start_date, so the
+ # new backfill run becomes the active one going forward.
dr = dag.create_dagrun(
run_id=dag.timetable.generate_run_id(
run_type=DagRunType.BACKFILL_JOB,
diff --git a/airflow-core/tests/unit/models/test_backfill.py
b/airflow-core/tests/unit/models/test_backfill.py
index 2f2bdb6b0d6..531d0ef01ab 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -1259,3 +1259,63 @@ def
test_backfill_partitioned_offset_zero_behavior_unchanged(dag_maker, session)
str(pendulum.instance(x.partition_date).in_timezone("Asia/Taipei").date()) for
x in dag_runs
]
assert partition_date_labels == ["2026-02-18", "2026-02-19", "2026-02-20"]
+
+
+def test_partitioned_backfill_reprocess_failed(dag_maker, session):
+ """Partitioned backfill with reprocess_behavior=FAILED creates a new run,
keeping the failed one.
+
+ Unlike non-partitioned Dags (which clear and re-queue), partitioned
backfills create a
+ fresh run alongside the existing failed one. The prior run is kept as a
historical record;
+ the new backfill run becomes the active one (latest start_date wins in
deduplication).
+ """
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+
+ # Determine the UTC partition_date for the 2026-02-18 Asia/Taipei
partition.
+ info = next(
+ i for i in dag.iter_dagrun_infos_between(pendulum.parse("2026-02-18"),
pendulum.parse("2026-02-18"))
+ )
+ expected_partition_date = info.partition_date
+
+ # Simulate a previously-scheduled run that failed.
+ dag_maker.create_dagrun(
+ run_id="scheduled__2026-02-18",
+ logical_date=None,
+ run_type="scheduled",
+ state=DagRunState.FAILED,
+ partition_key=info.partition_key,
+ partition_date=expected_partition_date,
+ session=session,
+ )
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-18"),
+ max_active_runs=1,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ reprocess_behavior=ReprocessBehavior.FAILED,
+ )
+
+ session.expunge_all()
+
+ all_runs = session.scalars(select(DagRun).where(DagRun.dag_id ==
dag.dag_id)).all()
+ # Two runs: original failed run kept as historical record + new backfill
run.
+ assert len(all_runs) == 2
+
+ failed_run = next(r for r in all_runs if r.run_id ==
"scheduled__2026-02-18")
+ assert failed_run.state == DagRunState.FAILED
+
+ backfill_run = next(r for r in all_runs if r.run_id !=
"scheduled__2026-02-18")
+ assert backfill_run.state == DagRunState.QUEUED
+ assert backfill_run.run_type == DagRunType.BACKFILL_JOB
+ assert backfill_run.partition_date == expected_partition_date
+
+ # BackfillDagRun links to the new run, not the historical failed one.
+ bdr =
session.scalar(select(BackfillDagRun).where(BackfillDagRun.backfill_id == b.id))
+ assert bdr is not None
+ assert bdr.dag_run_id == backfill_run.id
+ assert bdr.partition_key == info.partition_key