This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 58ba6620616 Fix misleading comment and pin reprocess behaviour for 
partitioned backfill (#68618)
58ba6620616 is described below

commit 58ba6620616e20ab96b02af937d0f7d5f9ef4c7f
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jun 17 13:39:58 2026 +0800

    Fix misleading comment and pin reprocess behaviour for partitioned backfill 
(#68618)
---
 airflow-core/src/airflow/models/backfill.py     |  9 ++--
 airflow-core/tests/unit/models/test_backfill.py | 60 +++++++++++++++++++++++++
 2 files changed, 65 insertions(+), 4 deletions(-)

diff --git a/airflow-core/src/airflow/models/backfill.py 
b/airflow-core/src/airflow/models/backfill.py
index 1ecee7f27a3..f27f236e171 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -484,10 +484,6 @@ def _create_backfill_dag_run_partitioned(
     triggering_user_name: str | None,
     session: Session,
 ) -> None:
-    # Partitioned backfills don't currently reprocess existing runs — if a run 
exists
-    # for this partition, it's recorded as skipped via exception_reason rather 
than
-    # cleared and re-queued. As a result, this function never calls 
``_handle_clear_run``
-    # and therefore doesn't need to forward ``dag_run_conf`` for the reprocess 
path.
     stmt = _get_latest_dag_run_row_query(dag_id=dag.dag_id, info=info)
     dr = session.scalar(stmt)
     if dr:
@@ -507,6 +503,11 @@ def _create_backfill_dag_run_partitioned(
                 "Skipping dag run creation.", 
non_create_reason=non_create_reason, backfill_id=backfill_id
             )
             return
+
+    # reprocess_behavior allows a retry: create a new run alongside the 
existing
+    # one rather than clearing it. The prior run is kept as a historical 
record;
+    # _get_latest_dag_run_row_query picks the newest run by start_date, so the
+    # new backfill run becomes the active one going forward.
     dr = dag.create_dagrun(
         run_id=dag.timetable.generate_run_id(
             run_type=DagRunType.BACKFILL_JOB,
diff --git a/airflow-core/tests/unit/models/test_backfill.py 
b/airflow-core/tests/unit/models/test_backfill.py
index 2f2bdb6b0d6..531d0ef01ab 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -1259,3 +1259,63 @@ def 
test_backfill_partitioned_offset_zero_behavior_unchanged(dag_maker, session)
         
str(pendulum.instance(x.partition_date).in_timezone("Asia/Taipei").date()) for 
x in dag_runs
     ]
     assert partition_date_labels == ["2026-02-18", "2026-02-19", "2026-02-20"]
+
+
+def test_partitioned_backfill_reprocess_failed(dag_maker, session):
+    """Partitioned backfill with reprocess_behavior=FAILED creates a new run, 
keeping the failed one.
+
+    Unlike non-partitioned Dags (which clear and re-queue), partitioned 
backfills create a
+    fresh run alongside the existing failed one. The prior run is kept as a 
historical record;
+    the new backfill run becomes the active one (latest start_date wins in 
deduplication).
+    """
+    with dag_maker(schedule=CronPartitionTimetable("0 0 * * *", 
timezone="Asia/Taipei")) as dag:
+        PythonOperator(task_id="hi", python_callable=print)
+
+    # Determine the UTC partition_date for the 2026-02-18 Asia/Taipei 
partition.
+    info = next(
+        i for i in dag.iter_dagrun_infos_between(pendulum.parse("2026-02-18"), 
pendulum.parse("2026-02-18"))
+    )
+    expected_partition_date = info.partition_date
+
+    # Simulate a previously-scheduled run that failed.
+    dag_maker.create_dagrun(
+        run_id="scheduled__2026-02-18",
+        logical_date=None,
+        run_type="scheduled",
+        state=DagRunState.FAILED,
+        partition_key=info.partition_key,
+        partition_date=expected_partition_date,
+        session=session,
+    )
+    session.commit()
+
+    b = _create_backfill(
+        dag_id=dag.dag_id,
+        from_date=pendulum.parse("2026-02-18"),
+        to_date=pendulum.parse("2026-02-18"),
+        max_active_runs=1,
+        reverse=False,
+        triggering_user_name="pytest",
+        dag_run_conf={},
+        reprocess_behavior=ReprocessBehavior.FAILED,
+    )
+
+    session.expunge_all()
+
+    all_runs = session.scalars(select(DagRun).where(DagRun.dag_id == 
dag.dag_id)).all()
+    # Two runs: original failed run kept as historical record + new backfill 
run.
+    assert len(all_runs) == 2
+
+    failed_run = next(r for r in all_runs if r.run_id == 
"scheduled__2026-02-18")
+    assert failed_run.state == DagRunState.FAILED
+
+    backfill_run = next(r for r in all_runs if r.run_id != 
"scheduled__2026-02-18")
+    assert backfill_run.state == DagRunState.QUEUED
+    assert backfill_run.run_type == DagRunType.BACKFILL_JOB
+    assert backfill_run.partition_date == expected_partition_date
+
+    # BackfillDagRun links to the new run, not the historical failed one.
+    bdr = 
session.scalar(select(BackfillDagRun).where(BackfillDagRun.backfill_id == b.id))
+    assert bdr is not None
+    assert bdr.dag_run_id == backfill_run.id
+    assert bdr.partition_key == info.partition_key

Reply via email to