This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2217dbc7d73 Backfill partitioned Dags by partition-date range (#67537)
2217dbc7d73 is described below
commit 2217dbc7d7374d7589e21cd676e60665f3e393cf
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 15 18:32:35 2026 +0800
Backfill partitioned Dags by partition-date range (#67537)
---
airflow-core/docs/core-concepts/backfill.rst | 26 +-
airflow-core/src/airflow/cli/cli_config.py | 18 +-
airflow-core/src/airflow/models/backfill.py | 39 +-
.../src/airflow/serialization/definitions/dag.py | 34 +-
airflow-core/src/airflow/timetables/base.py | 21 +-
airflow-core/src/airflow/timetables/trigger.py | 57 ++-
.../core_api/routes/public/test_backfills.py | 128 ++++++
.../unit/cli/commands/test_backfill_command.py | 26 ++
airflow-core/tests/unit/models/test_backfill.py | 437 ++++++++++++++++++++-
airflow-core/tests/unit/models/test_dag.py | 47 ++-
.../unit/timetables/test_trigger_timetable.py | 238 +++++++++++
11 files changed, 1026 insertions(+), 45 deletions(-)
diff --git a/airflow-core/docs/core-concepts/backfill.rst
b/airflow-core/docs/core-concepts/backfill.rst
index 74363381ad4..298a7a32e7e 100644
--- a/airflow-core/docs/core-concepts/backfill.rst
+++ b/airflow-core/docs/core-concepts/backfill.rst
@@ -18,8 +18,8 @@
Backfill
========
-Backfill is when you create runs for past dates of a Dag. Airflow provides a
mechanism
-to do this through the CLI and REST API. You provide a Dag, a start date, and
an end date,
+Backfill is when you create runs for past dates of a Dag. Airflow provides a
mechanism
+to do this through the CLI and REST API. You provide a Dag, a start date, and
an end date,
and Airflow will create runs in the range according to the Dag's schedule.
Backfill does not make sense for Dags that don't have a time-based schedule.
@@ -45,13 +45,13 @@ the Dag ``max_active_runs`` setting.
Run ordering
------------
-You can run your backfill in reverse, i.e. latest runs first. The CLI option
is ``--run-backwards``.
+You can run your backfill in reverse, i.e. latest runs first. The CLI option
is ``--run-backwards``.
Dry run
-------
Backfill dry run is a CLI option that will print out the dates that the
-backfill will consider creating runs for. Whether or not they will be created
+backfill will consider creating runs for. Whether or not they will be created
depends on your chosen reprocessing behavior and the states of any existing
runs in the range at the time you actually run the backfill.
@@ -87,3 +87,21 @@ For UI, follow the following steps:
.. image:: ../img/ui-light/backfill.png
:alt: Backfill pop-up window (Light Mode)
+
+Partitioned Dag backfills
+-------------------------
+
+For Dags that use a partition-based timetable (e.g.
``CronPartitionTimetable``),
+the same ``--from-date`` / ``--to-date`` flags are used for all backfills.
+Airflow automatically detects whether the Dag is partitioned and interprets
+the date range as a partition-date range accordingly — one Dag run is created
+per partition within the window.
+
+Example: backfill the partitions from 2026-02-18 to 2026-02-20::
+
+ airflow backfill create --dag-id my_partitioned_dag \
+ --from-date 2026-02-18 \
+ --to-date 2026-02-20
+
+Concurrency is controlled by the same ``--max-active-runs`` option used for
+all backfills.
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index a462a77e10a..e26f9737d8f 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -374,10 +374,22 @@ ARG_TEAM_NAME = Arg(("name",), help="Team name")
# backfill
ARG_BACKFILL_DAG = Arg(flags=("--dag-id",), help="The dag to backfill.",
required=True)
ARG_BACKFILL_FROM_DATE = Arg(
- ("--from-date",), help="Earliest logical date to backfill.",
type=parsedate, required=True
+ ("--from-date",),
+ help=(
+ "Backfill window start (inclusive). "
+ "For partitioned Dags, this range is interpreted as the partition-date
range (auto-detected)."
+ ),
+ type=parsedate,
+ required=True,
)
ARG_BACKFILL_TO_DATE = Arg(
- ("--to-date",), help="Latest logical date to backfill", type=parsedate,
required=True
+ ("--to-date",),
+ help=(
+ "Backfill window end (inclusive). "
+ "For partitioned Dags, this range is interpreted as the partition-date
range (auto-detected)."
+ ),
+ type=parsedate,
+ required=True,
)
ARG_DAG_RUN_CONF = Arg(flags=("--dag-run-conf",), help="JSON dag run
configuration.")
ARG_RUN_BACKWARDS = Arg(
@@ -417,8 +429,6 @@ ARG_BACKFILL_RUN_ON_LATEST_VERSION = Arg(
action=argparse.BooleanOptionalAction,
default=None,
)
-
-
# misc
ARG_TREAT_DAG_ID_AS_REGEX = Arg(
("--treat-dag-id-as-regex",),
diff --git a/airflow-core/src/airflow/models/backfill.py
b/airflow-core/src/airflow/models/backfill.py
index 65a5e532062..1ecee7f27a3 100644
--- a/airflow-core/src/airflow/models/backfill.py
+++ b/airflow-core/src/airflow/models/backfill.py
@@ -246,8 +246,14 @@ def _get_latest_dag_run_row_query(*, dag_id: str, info:
DagRunInfo):
from airflow.models import DagRun
stmt = select(DagRun).where(DagRun.dag_id == dag_id)
- if info.partition_key is not None:
- stmt = stmt.where(DagRun.partition_key == info.partition_key)
+ if info.partition_date is not None:
+ # Filter the runs whose partition_date matches (the UTC instant of the
partition
+ # tick), not the partition_key string. partition_date is the
canonical, format-
+ # independent identity of a partition: a scheduled run and a backfill
run for the
+ # same tick always agree on it, even if partition_key is later
formatted differently
+ # (e.g. relabelled in the timetable timezone). Filtering by the string
would let a
+ # backfill create duplicate runs whenever the key format changed.
+ stmt = stmt.where(DagRun.partition_date == info.partition_date)
if info.logical_date is not None:
stmt = stmt.where(DagRun.logical_date == info.logical_date)
stmt = stmt.order_by(DagRun.start_date.is_(None), DagRun.start_date.desc())
@@ -326,14 +332,21 @@ def _do_dry_run(
if dag.allowed_run_types is not None and DagRunType.BACKFILL_JOB not in
dag.allowed_run_types:
raise DagRunTypeNotAllowed(f"Dag with dag_id: '{dag_id}' does not
allow backfill runs")
- _validate_backfill_params(dag, reverse, from_date, to_date,
reprocess_behavior, dag_run_conf)
-
+ _validate_backfill_params(
+ dag,
+ reverse,
+ from_date,
+ to_date,
+ reprocess_behavior,
+ dag_run_conf,
+ )
dagrun_info_list = _get_info_list(
dag=dag,
from_date=from_date,
to_date=to_date,
reverse=reverse,
)
+
for info in dagrun_info_list:
if TYPE_CHECKING:
assert info.logical_date
@@ -534,10 +547,11 @@ def _get_info_list(
dag: SerializedDAG,
) -> list[DagRunInfo]:
infos = dag.iter_dagrun_infos_between(from_date, to_date)
- now = timezone.utcnow()
- dagrun_info_list = [
- x for x in infos if x.partition_key or (x.data_interval and
x.data_interval.end < now)
- ]
+ if not dag.timetable.partitioned:
+ now = timezone.utcnow()
+ dagrun_info_list = [x for x in infos if x.data_interval and
x.data_interval.end < now]
+ else:
+ dagrun_info_list = list(infos)
if reverse:
dagrun_info_list = list(reversed(dagrun_info_list))
return dagrun_info_list
@@ -640,7 +654,14 @@ def _create_backfill(
f"There can be only one running backfill per Dag."
)
- _validate_backfill_params(dag, reverse, from_date, to_date,
reprocess_behavior, dag_run_conf)
+ _validate_backfill_params(
+ dag,
+ reverse,
+ from_date,
+ to_date,
+ reprocess_behavior,
+ dag_run_conf,
+ )
br = Backfill(
dag_id=dag_id,
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index ac22806eb55..d4de2a2c479 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -454,11 +454,28 @@ class SerializedDAG:
latest: datetime.datetime,
) -> Iterable[DagRunInfo]:
"""
- Yield DagRunInfo using this DAG's timetable between given interval.
-
- DagRunInfo instances yielded if their ``logical_date`` is not earlier
- than ``earliest``, nor later than ``latest``. The instances are ordered
- by their ``logical_date`` from earliest to latest.
+ Yield DagRunInfo using this Dag's timetable between given interval.
+
+ For non-partitioned timetables, DagRunInfo instances are yielded if
+ their ``logical_date`` is not earlier than ``earliest``, nor later than
+ ``latest``. The instances are ordered by their ``logical_date`` from
+ earliest to latest.
+
+ For partitioned timetables, the iteration axis is ``partition_date``
+ rather than ``logical_date``. ``earliest`` and ``latest`` are resolved
+ to their respective day-bounds (timetable-timezone local midnight
+ converted to UTC) and the iteration walks the half-open interval
+ ``[resolve_day_bound(earliest.date()), resolve_day_bound(latest.date()
+ 1 day))``,
+ making ``latest``'s calendar date inclusive regardless of its time
+ component. Each yielded :class:`~airflow.timetables.base.DagRunInfo`
+ has ``logical_date=None``, ``data_interval=None``, and
+ ``run_after == partition_date``; see
+ :meth:`~airflow.timetables.base.Timetable.iter_partition_dagrun_infos`
+ for details.
+
+ Day-bound dispatch is centralised here rather than at every call site
+ (e.g. backfill) so partitioned vs. non-partitioned routing is handled
+ once. See
https://github.com/apache/airflow/pull/67537#discussion_r3386682447
"""
if earliest is None:
earliest = self._time_restriction.earliest
@@ -467,6 +484,13 @@ class SerializedDAG:
earliest = coerce_datetime(earliest)
latest = coerce_datetime(latest)
+ if self.timetable.partitioned:
+ yield from self.timetable.iter_partition_dagrun_infos(
+ earliest_date=earliest.date(),
+ latest_date=latest.date(),
+ )
+ return
+
restriction = TimeRestriction(earliest, latest, catchup=True)
info = None
diff --git a/airflow-core/src/airflow/timetables/base.py
b/airflow-core/src/airflow/timetables/base.py
index f53f6010f98..b07e5c98c93 100644
--- a/airflow-core/src/airflow/timetables/base.py
+++ b/airflow-core/src/airflow/timetables/base.py
@@ -26,7 +26,7 @@ from airflow._shared.timezones import timezone
from airflow.serialization.definitions.assets import SerializedAssetBase
if TYPE_CHECKING:
- from collections.abc import Iterator, Sequence
+ from collections.abc import Iterable, Iterator, Sequence
from pendulum import DateTime
@@ -258,6 +258,25 @@ class Timetable(Protocol):
)
raise NotImplementedError(msg)
+ def iter_partition_dagrun_infos(
+ self,
+ *,
+ earliest_date: datetime.date,
+ latest_date: datetime.date,
+ ) -> Iterable[DagRunInfo]:
+ """
+ Yield one DagRunInfo per partition for calendar days in
``[earliest_date, latest_date]`` (both inclusive).
+
+ Only called for partitioned timetables (``partitioned is True``). The
default
+ implementation raises :exc:`NotImplementedError`; timetables that set
+ ``partitioned = True`` must override this.
+ """
+ if self.partitioned:
+ msg = f"{type(self).__name__} is partitioned but does not
implement iter_partition_dagrun_infos."
+ else:
+ msg = f"{type(self).__name__} is not partitioned"
+ raise NotImplementedError(msg)
+
def resolve_day_bound(self, day: datetime.date) -> DateTime:
"""
Return the UTC instant of *day*'s start (midnight).
diff --git a/airflow-core/src/airflow/timetables/trigger.py
b/airflow-core/src/airflow/timetables/trigger.py
index d530d5b898a..b3ac7cbf2e0 100644
--- a/airflow-core/src/airflow/timetables/trigger.py
+++ b/airflow-core/src/airflow/timetables/trigger.py
@@ -21,6 +21,7 @@ import functools
import math
import operator
import time
+from datetime import timedelta
from types import NoneType
from typing import TYPE_CHECKING, Any
@@ -33,6 +34,8 @@ from airflow.timetables.base import DagRunInfo, DataInterval,
Timetable
from airflow.utils.strings import get_random_string
if TYPE_CHECKING:
+ from collections.abc import Iterable
+
from dateutil.relativedelta import relativedelta
from pendulum import DateTime
from pendulum.tz.timezone import FixedTimezone, Timezone
@@ -464,8 +467,56 @@ class CronPartitionTimetable(CronTriggerTimetable):
partition_key = self._format_key(partition_date)
return partition_date, partition_key
+ def iter_partition_dagrun_infos(
+ self,
+ *,
+ earliest_date: datetime.date,
+ latest_date: datetime.date,
+ ) -> Iterable[DagRunInfo]:
+ """
+ Yield one DagRunInfo per cron tick for calendar days in
``[earliest_date, latest_date]`` (both inclusive).
+
+ Iteration walks directly along the partition_date axis — one cron tick
per
+ partition — without any reverse mapping from run_after. Each tick
yields:
+
+ - ``partition_date = current`` (the cron tick itself, as a UTC instant)
+ - ``partition_key`` formatted by :meth:`_format_key` (local-tz label)
+ - ``run_after = partition_date`` (identical to the tick)
+ - ``data_interval = None``
+
+ **Design note — ``run_after := partition_date``.**
+ For ``run_offset != 0`` this differs from the cron run-time a scheduled
+ run would carry; this is intentional. ``run_after`` is not
load-bearing
+ for backfill execution: deduplication is keyed on ``partition_key``,
scheduling gates
+ on ``run_after <= now()`` (always satisfied for past partitions), and
+ ordering by ``BackfillDagRun.sort_ordinal`` (``run_after`` is only the
+ final tiebreaker). Setting ``run_after = partition_date`` is the
simplest
+ correct choice and avoids the need for a reverse mapping.
+
+ :param earliest_date: inclusive lower bound calendar date; the UTC
window start is
+ ``resolve_day_bound(earliest_date)``, i.e. local midnight of that
day.
+ :param latest_date: inclusive upper bound calendar date; the UTC
window end is
+ ``resolve_day_bound(latest_date + 1 day)`` (exclusive), so all
ticks within
+ ``latest_date``'s local day are included.
+ """
+ earliest_partition_date = self.resolve_day_bound(earliest_date)
+ latest_partition_date = self.resolve_day_bound(latest_date +
timedelta(days=1))
+ current = self._align_to_next(earliest_partition_date)
+ while current < latest_partition_date:
+ partition_key = self._format_key(current)
+ yield DagRunInfo(
+ run_after=current,
+ data_interval=None,
+ partition_date=current,
+ partition_key=partition_key,
+ )
+ current = self._get_next(current)
+
def _format_key(self, partition_date: DateTime) -> str:
- return partition_date.strftime(self._key_format)
+ # partition_date is a UTC instant; format the key in the timetable
timezone so the
+ # key reflects the local partition date the user reasons about (e.g.
an Asia/Taipei
+ # midnight partition keys as "...T00:00:00", not the prior UTC day's
"...T16:00:00").
+ return
partition_date.in_timezone(self._timezone).strftime(self._key_format)
def next_dagrun_info_v2(
self,
@@ -474,8 +525,8 @@ class CronPartitionTimetable(CronTriggerTimetable):
restriction: TimeRestriction,
) -> DagRunInfo | None:
# todo: AIP-76 add test for this logic
- # todo: AIP-76 we will have to ensure that the start / end times apply
to the partition date ideally,
- # rather than just the run after
+ # Scheduler scheduling path: uses next_dagrun_info_v2 to advance
run_after one tick
+ # at a time. Backfill iterates partitions directly via
timetable.iter_partition_dagrun_infos.
if restriction.catchup:
if last_dagrun_info is not None:
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
index 66be4e63252..42b48fc0c49 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py
@@ -32,6 +32,7 @@ from airflow.models.dag import DAG
from airflow.models.dagbundle import DagBundleModel
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
+from airflow.sdk import CronPartitionTimetable
from airflow.utils.session import provide_session
from airflow.utils.state import DagRunState
@@ -838,6 +839,133 @@ class TestCreateBackfillDryRun(TestBackfillEndpoint):
)
+class TestCreateBackfillPartitioned(TestBackfillEndpoint):
+ """Tests for partition-date selector paths on POST /backfills and POST
/backfills/dry_run."""
+
+ @pytest.mark.parametrize(
+ ("url", "extra_assertions"),
+ [
+ ("/backfills", "create"),
+ ("/backfills/dry_run", "dry_run"),
+ ],
+ )
+ def test_partitioned_dag_with_from_to_dates(self, session, dag_maker,
test_client, url, extra_assertions):
+ """Partitioned Dag + from_date/to_date succeeds on both routes
(auto-detected)."""
+ with dag_maker(
+ session=session,
+ dag_id="TEST_PARTITIONED_DAG",
+ schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei"),
+ ):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ data = {
+ "dag_id": "TEST_PARTITIONED_DAG",
+ "from_date": "2026-02-18T00:00:00+00:00",
+ "to_date": "2026-02-20T00:00:00+00:00",
+ "max_active_runs": 5,
+ "run_backwards": False,
+ }
+ response = test_client.post(url=url, json=data)
+ assert response.status_code == 200
+ if extra_assertions == "create":
+ assert response.json() == {
+ "completed_at": mock.ANY,
+ "created_at": mock.ANY,
+ "dag_display_name": "TEST_PARTITIONED_DAG",
+ "dag_id": "TEST_PARTITIONED_DAG",
+ "dag_run_conf": None,
+ "from_date": "2026-02-18T00:00:00Z",
+ "id": mock.ANY,
+ "is_paused": False,
+ "reprocess_behavior": "none",
+ "max_active_runs": 5,
+ "to_date": "2026-02-20T00:00:00Z",
+ "updated_at": mock.ANY,
+ }
+ elif extra_assertions == "dry_run":
+ resp_json = response.json()
+ assert resp_json["total_entries"] >= 1
+ first = resp_json["backfills"][0]
+ assert "partition_key" in first
+ assert "partition_date" in first
+
+ @pytest.mark.parametrize(
+ "url",
+ [
+ "/backfills",
+ "/backfills/dry_run",
+ ],
+ )
+ def test_missing_from_to_date_returns_422(self, session, dag_maker,
test_client, url):
+ """Missing required from_date/to_date fields → 422 from schema
validation."""
+ with dag_maker(session=session, dag_id="TEST_DAG_1", schedule="0 0 * *
*"):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ data = {
+ "dag_id": "TEST_DAG_1",
+ "max_active_runs": 5,
+ "run_backwards": False,
+ }
+ response = test_client.post(url=url, json=data)
+ assert response.status_code == 422
+
+ @pytest.mark.parametrize(
+ "url",
+ [
+ "/backfills",
+ "/backfills/dry_run",
+ ],
+ )
+ def test_partitioned_dag_at_cap_single_day_returns_200(self, session,
dag_maker, test_client, url):
+ """Partitioned Dag: from_date == to_date (same day) is allowed → 200
(auto-detected)."""
+ with dag_maker(
+ session=session,
+ dag_id="TEST_PARTITIONED_DAG",
+ schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei"),
+ ):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ data = {
+ "dag_id": "TEST_PARTITIONED_DAG",
+ "from_date": "2026-02-18T00:00:00+00:00",
+ "to_date": "2026-02-18T00:00:00+00:00",
+ "max_active_runs": 5,
+ "run_backwards": False,
+ }
+ response = test_client.post(url=url, json=data)
+ assert response.status_code == 200
+
+ @pytest.mark.parametrize(
+ "url",
+ [
+ "/backfills",
+ "/backfills/dry_run",
+ ],
+ )
+ def test_partitioned_dag_from_date_after_to_date_returns_422(self,
session, dag_maker, test_client, url):
+ """Partitioned Dag + from_date > to_date → 422
(InvalidBackfillDateRange)."""
+ with dag_maker(
+ session=session,
+ dag_id="TEST_PARTITIONED_DAG",
+ schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei"),
+ ):
+ EmptyOperator(task_id="mytask")
+ session.commit()
+
+ data = {
+ "dag_id": "TEST_PARTITIONED_DAG",
+ "from_date": "2026-05-13T00:00:00+00:00",
+ "to_date": "2026-05-12T00:00:00+00:00",
+ "max_active_runs": 5,
+ "run_backwards": False,
+ }
+ response = test_client.post(url=url, json=data)
+ assert response.status_code == 422
+
+
class TestCancelBackfill(TestBackfillEndpoint):
def test_cancel_backfill(self, session, test_client):
(dag,) = self._create_dag_models()
diff --git a/airflow-core/tests/unit/cli/commands/test_backfill_command.py
b/airflow-core/tests/unit/cli/commands/test_backfill_command.py
index ab6c9378346..c897fa20443 100644
--- a/airflow-core/tests/unit/cli/commands/test_backfill_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_backfill_command.py
@@ -214,6 +214,32 @@ class TestCliBackfill:
with pytest.raises(ValueError, match="Invalid JSON in --dag-run-conf"):
airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args))
+ def test_backfill_create_missing_from_date_raises(self):
+ """Test that omitting --from-date causes argparse to exit with an
error."""
+ args = [
+ "backfill",
+ "create",
+ "--dag-id",
+ "example_bash_operator",
+ "--to-date",
+ DEFAULT_DATE.isoformat(),
+ ]
+ with pytest.raises(SystemExit):
+ self.parser.parse_args(args)
+
+ def test_backfill_create_missing_to_date_raises(self):
+ """Test that omitting --to-date causes argparse to exit with an
error."""
+ args = [
+ "backfill",
+ "create",
+ "--dag-id",
+ "example_bash_operator",
+ "--from-date",
+ DEFAULT_DATE.isoformat(),
+ ]
+ with pytest.raises(SystemExit):
+ self.parser.parse_args(args)
+
@mock.patch("airflow.cli.commands.backfill_command._create_backfill")
def test_backfill_with_empty_dag_run_conf(self, mock_create):
"""Test that empty dag_run_conf is properly parsed."""
diff --git a/airflow-core/tests/unit/models/test_backfill.py
b/airflow-core/tests/unit/models/test_backfill.py
index d5f36b78bf8..2f2bdb6b0d6 100644
--- a/airflow-core/tests/unit/models/test_backfill.py
+++ b/airflow-core/tests/unit/models/test_backfill.py
@@ -43,7 +43,7 @@ from airflow.models.backfill import (
_get_latest_dag_run_row_query,
)
from airflow.providers.standard.operators.python import PythonOperator
-from airflow.sdk import Asset
+from airflow.sdk import Asset, CronPartitionTimetable,
PartitionedAssetTimetable
from airflow.ti_deps.dep_context import DepContext
from airflow.timetables.base import DagRunInfo
from airflow.utils.state import DagRunState, TaskInstanceState
@@ -233,7 +233,7 @@ def
test_create_backfill_clear_existing_bundle_version(dag_maker, session, run_o
@pytest.mark.parametrize(
"existing",
[
- ["2026-02-22T16:00:00", "2026-02-23T16:00:00"],
+ ["2026-02-23T00:00:00", "2026-02-24T00:00:00"],
[],
],
)
@@ -246,17 +246,23 @@ def
test_create_backfill_clear_existing_bundle_version(dag_maker, session, run_o
)
def test_create_backfill_partitioned(reverse, existing, start_date, dag_maker,
session):
"""Verify partitioned backfill creates new runs per partition."""
- from airflow.sdk import CronPartitionTimetable
-
with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
PythonOperator(task_id="hi", python_callable=print)
+ # Map each partition_key label to the partition_date (UTC instant) the
timetable computes.
+ partition_date_by_key = {
+ info.partition_key: info.partition_date
+ for info in
dag.iter_dagrun_infos_between(pendulum.parse("2026-02-14"),
pendulum.parse("2026-02-25"))
+ }
+ # Existing runs stand in for historical scheduled runs: they carry
partition_date, which is
+ # what deduplication keys on (see _get_latest_dag_run_row_query).
for date in existing:
dag_maker.create_dagrun(
start_date=start_date,
run_id=f"scheduled_{date}",
logical_date=None,
partition_key=date,
+ partition_date=partition_date_by_key[date],
session=session,
)
session.commit()
@@ -278,8 +284,11 @@ def test_create_backfill_partitioned(reverse, existing,
start_date, dag_maker, s
.order_by(BackfillDagRun.sort_ordinal)
)
dag_runs = session.scalars(query).all()
+ # partition_key is a timetable-local label (e.g. "2026-02-15T00:00:00"),
so its own date
+ # is the partition's calendar date. to_date is inclusive, so the window is
2026-02-15..24;
+ # the seeded existing partitions (reprocess_behavior=None) are skipped.
partition_keys = [str(datetime.fromisoformat(x.partition_key).date()) for
x in dag_runs]
- expected_dates = [f"2026-02-{d}" for d in range(15, 22 if existing else
24)]
+ expected_dates = [f"2026-02-{d}" for d in range(15, 23 if existing else
25)]
if reverse:
expected_dates = list(reversed(expected_dates))
@@ -290,13 +299,83 @@ def test_create_backfill_partitioned(reverse, existing,
start_date, dag_maker, s
# path must populate it alongside partition_key. Verify that backfill
copies
# info.partition_date faithfully — i.e. the stored value matches what the
# timetable computed for each partition_key.
- expected_partition_date_by_key = {
- info.partition_key: info.partition_date
- for info in
dag.iter_dagrun_infos_between(pendulum.parse("2026-02-15"),
pendulum.parse("2026-02-24"))
- }
- assert [x.partition_date for x in dag_runs] == [
- expected_partition_date_by_key[x.partition_key] for x in dag_runs
- ]
+ # Asia/Taipei partitions have partition_date (UTC instant) one day earlier
than the
+ # calendar label: label "2026-02-15" → partition_date 2026-02-14T16:00:00Z.
+ assert [x.partition_date for x in dag_runs] ==
[partition_date_by_key[x.partition_key] for x in dag_runs]
+
+
+def test_create_backfill_partitioned_key_uses_timetable_timezone(dag_maker,
session):
+ """partition_key is labelled in the timetable timezone, not UTC.
+
+ An Asia/Taipei midnight partition is the 16:00Z instant of the previous
UTC day, so its
+ key must read as the local date ("2026-02-18T00:00:00"), not
"2026-02-17T16:00:00", while
+ the stored partition_date keeps the underlying UTC instant.
+ """
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-18"),
+ max_active_runs=1,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+ query =
select(DagRun).join(BackfillDagRun.dag_run).where(BackfillDagRun.backfill_id ==
b.id)
+ dag_runs = session.scalars(query).all()
+
+ assert [x.partition_key for x in dag_runs] == ["2026-02-18T00:00:00"]
+ assert [x.partition_date for x in dag_runs] == [pendulum.datetime(2026, 2,
17, 16, tz="UTC")]
+
+
+def
test_backfill_partitioned_does_not_duplicate_legacy_utc_keyed_run(dag_maker,
session):
+ """A backfill must not duplicate a run that was keyed with the old
UTC-instant label.
+
+ CronPartitionTimetable shipped in 3.2.0/3.2.1 formatting partition_key off
the UTC
+ instant; it now labels the key in the timetable timezone. Deduplication
keys on
+ partition_date (the UTC instant), so a backfill over a window that already
has a run
+ skips it regardless of the key-string format — no duplicate partition run
is created.
+ """
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+
+ # Asia/Taipei midnight 2026-02-18 is the UTC instant 2026-02-17T16:00:00Z.
The historical
+ # run carries the *old* UTC-instant key, not today's local label.
+ partition_date = pendulum.datetime(2026, 2, 17, 16, tz="UTC")
+ dag_maker.create_dagrun(
+ run_id="scheduled_legacy",
+ logical_date=None,
+ run_type="scheduled",
+ state=DagRunState.SUCCESS,
+ partition_key="2026-02-17T16:00:00",
+ partition_date=partition_date,
+ session=session,
+ )
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-18"),
+ max_active_runs=1,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+
+ # No new run created for this partition: still exactly one DagRun at that
partition_date.
+ runs_at_partition = session.scalars(
+ select(DagRun).where(DagRun.dag_id == dag.dag_id,
DagRun.partition_date == partition_date)
+ ).all()
+ assert [x.run_id for x in runs_at_partition] == ["scheduled_legacy"]
+ # The backfill recorded the partition as already existing instead of
creating a duplicate.
+ bdr =
session.scalars(select(BackfillDagRun).where(BackfillDagRun.backfill_id ==
b.id)).all()
+ assert len(bdr) == 1
+ assert bdr[0].dag_run_id is None
+ assert bdr[0].exception_reason ==
BackfillDagRunExceptionReason.ALREADY_EXISTS
@pytest.mark.parametrize(
@@ -758,30 +837,41 @@ def
test_depends_on_past_requires_reprocess_failed(dep_on_past, behavior, dag_ma
def test_get_latest_dag_run_row_partitioned(session: Session):
- partition_key = "2026-02-22T16:00:00"
+ """Deduplication matches on partition_date, independent of the
partition_key string.
+
+ The seeded runs stand in for historical scheduled runs keyed with the
UTC-instant label
+ that CronPartitionTimetable shipped in 3.2.0/3.2.1; the incoming info
carries the same
+ instant relabelled in the timetable timezone. The query must still find
the existing run
+ on partition_date so a backfill does not duplicate an already-scheduled
partition run.
+ """
+ partition_date = timezone.parse("2026-02-22T16:00:00Z")
+ legacy_utc_key = "2026-02-22T16:00:00"
for start_date in [timezone.parse("2025-05-12"), None,
timezone.parse("2026-02-23")]:
session.add(
DagRun(
dag_id="test_dag_id",
run_id=f"test_run_id_{get_random_string()}",
start_date=start_date,
- run_type="manual",
+ run_type="scheduled",
state=DagRunState.SUCCESS,
- partition_key=partition_key,
+ partition_key=legacy_utc_key,
+ partition_date=partition_date,
)
)
session.commit()
info = DagRunInfo(
run_after=pendulum.now(),
data_interval=None,
- partition_date=pendulum.DateTime.fromisoformat(partition_key),
- partition_key=partition_key,
+ partition_date=partition_date,
+ partition_key="2026-02-23T00:00:00", # same instant, relabelled in
the timetable tz
)
stmt = _get_latest_dag_run_row_query(dag_id="test_dag_id", info=info)
dr = session.scalar(stmt)
assert dr is not None
assert dr.start_date == timezone.parse("2026-02-23")
+ # Matched despite the differing key string — deduplication is on
partition_date.
+ assert dr.partition_key == legacy_utc_key
@pytest.mark.parametrize(
@@ -791,6 +881,11 @@ def test_get_latest_dag_run_row_partitioned(session:
Session):
pytest.param("@once", {}, id="once"),
pytest.param("@continuous", {"max_active_runs": 1}, id="continuous"),
pytest.param([Asset(uri="test://asset", name="test-asset")], {},
id="asset-scheduled"),
+ pytest.param(
+ PartitionedAssetTimetable(assets=Asset(uri="test://partitioned",
name="test-partitioned")),
+ {},
+ id="partitioned-asset",
+ ),
],
)
def test_create_backfill_non_periodic_schedule_rejected(schedule, dag_kwargs,
dag_maker, session):
@@ -819,6 +914,11 @@ def
test_create_backfill_non_periodic_schedule_rejected(schedule, dag_kwargs, da
pytest.param("@once", {}, id="once"),
pytest.param("@continuous", {"max_active_runs": 1}, id="continuous"),
pytest.param([Asset(uri="test://asset", name="test-asset")], {},
id="asset-scheduled"),
+ pytest.param(
+ PartitionedAssetTimetable(assets=Asset(uri="test://partitioned",
name="test-partitioned")),
+ {},
+ id="partitioned-asset",
+ ),
],
)
def test_do_dry_run_non_periodic_schedule_rejected(schedule, dag_kwargs,
dag_maker, session):
@@ -856,3 +956,306 @@ def
test_create_backfill_from_date_after_to_date_raises(dag_maker, session):
triggering_user_name="pytest",
dag_run_conf={},
)
+
+
+def test_create_backfill_partitioned_from_date_after_to_date_raises(dag_maker,
session):
+ """Partitioned Dag + from_date > to_date raises
InvalidBackfillDateRange."""
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ with pytest.raises(InvalidBackfillDateRange, match="must not be after
to_date"):
+ _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-05-13"),
+ to_date=pendulum.parse("2026-05-12"),
+ max_active_runs=2,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+
+
[email protected]("reverse", [False, True])
+def test_backfill_partitioned_with_partition_window(reverse, dag_maker,
session):
+ """Partitioned Dag: from/to window of 3 days produces 3 runs
(auto-detected)."""
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-20"),
+ max_active_runs=2,
+ reverse=reverse,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+ query = (
+ select(DagRun)
+ .join(BackfillDagRun.dag_run)
+ .where(BackfillDagRun.backfill_id == b.id)
+ .order_by(BackfillDagRun.sort_ordinal)
+ )
+ dag_runs = session.scalars(query).all()
+ partition_date_labels = [
+
str(pendulum.instance(x.partition_date).in_timezone("Asia/Taipei").date()) for
x in dag_runs
+ ]
+ expected = ["2026-02-18", "2026-02-19", "2026-02-20"]
+ if reverse:
+ expected = list(reversed(expected))
+ assert partition_date_labels == expected
+
+
+def test_backfill_partitioned_at_cap_single_day(dag_maker, session):
+ """Partitioned Dag: from_date == to_date produces exactly one run (at-cap
boundary, auto-detected)."""
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-18"),
+ max_active_runs=2,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+ query =
select(DagRun).join(BackfillDagRun.dag_run).where(BackfillDagRun.backfill_id ==
b.id)
+ dag_runs = session.scalars(query).all()
+ assert len(dag_runs) == 1
+ assert (
+
str(pendulum.instance(dag_runs[0].partition_date).in_timezone("Asia/Taipei").date())
== "2026-02-18"
+ )
+
+
+def test_backfill_orm_from_to_synthesised(dag_maker, session):
+ """Partitioned Dag: Backfill.from_date / to_date match the provided
from/to range."""
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ pds = pendulum.parse("2026-02-18")
+ pde = pendulum.parse("2026-02-20")
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pds,
+ to_date=pde,
+ max_active_runs=2,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+
+ br = session.get(Backfill, b.id)
+ assert br is not None
+ assert br.from_date.date() == pds.date()
+ assert br.to_date.date() == pde.date()
+
+
+def test_do_dry_run_with_partition_window(dag_maker, session):
+ """_do_dry_run with partitioned Dag returns infos within the from/to
window (auto-detected)."""
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ infos = list(
+ _do_dry_run(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-20"),
+ reverse=False,
+ reprocess_behavior=ReprocessBehavior.NONE,
+ session=session,
+ )
+ )
+ dates =
[str(pendulum.instance(i.partition_date).in_timezone("Asia/Taipei").date()) for
i in infos]
+ assert dates == ["2026-02-18", "2026-02-19", "2026-02-20"]
+
+
[email protected]_test
+def test_create_backfill_partitioned_non_utc_boundary(dag_maker, session):
+ """
+ Regression: partition_date selection axis must use calendar-date
comparison in the
+ timetable timezone, not a UTC-instant comparison.
+
+ Asia/Taipei is UTC+8. A partition labelled "2026-02-15" has
+ partition_date = 2026-02-14T16:00:00Z (the UTC instant of Taipei midnight).
+ Passing that UTC instant through an instant comparison against the user's
+ --partition-date-start 2026-02-15 (parsed as 2026-02-15T00:00:00Z) would
+ yield 2026-02-14T16:00Z < 2026-02-15T00:00Z, dropping the boundary day.
+ The date-label comparison avoids this by converting to timetable timezone
+ before calling .date().
+ """
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei")) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-15"),
+ to_date=pendulum.parse("2026-02-24"),
+ max_active_runs=10,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf=None,
+ )
+
+ query = (
+ select(DagRun)
+ .join(BackfillDagRun.dag_run)
+ .where(BackfillDagRun.backfill_id == b.id)
+ .order_by(BackfillDagRun.sort_ordinal)
+ )
+ dag_runs = session.scalars(query).all()
+ partition_date_labels = [
+
str(pendulum.instance(x.partition_date).in_timezone("Asia/Taipei").date()) for
x in dag_runs
+ ]
+
+ # Independent oracle: all 10 calendar days from 2026-02-15 to 2026-02-24
inclusive.
+ # Built without calling iter_dagrun_infos_between so the oracle cannot
drift with
+ # padding changes.
+ expected = [f"2026-02-{d:02d}" for d in range(15, 25)]
+ assert partition_date_labels == expected
+
+ # Cap-boundary pair: padding days must be trimmed by the date-label filter.
+ # at-cap: 2026-02-24 (end boundary) is included — already covered by
full-sequence assert above.
+ # over-cap: 2026-02-25 (one past end) and 2026-02-14 (one before start)
must not appear.
+ assert "2026-02-25" not in partition_date_labels
+ assert "2026-02-14" not in partition_date_labels
+
+
[email protected]_test
[email protected]("run_offset", [1, 2, -1, -2])
+def test_backfill_partitioned_nonzero_offset_full_window(run_offset,
dag_maker, session):
+ """Non-zero run_offset: backfill returns exactly the requested partition
window (no leaks, no gaps).
+
+ Proves that direct partition-axis enumeration via
timetable.iter_partition_dagrun_infos
+ produces the correct set of runs for any integer run_offset.
+ """
+ with dag_maker(
+ schedule=CronPartitionTimetable("0 0 * * *", timezone="Asia/Taipei",
run_offset=run_offset)
+ ) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-20"),
+ max_active_runs=10,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+ query = (
+ select(DagRun)
+ .join(BackfillDagRun.dag_run)
+ .where(BackfillDagRun.backfill_id == b.id)
+ .order_by(BackfillDagRun.sort_ordinal)
+ )
+ dag_runs = session.scalars(query).all()
+ partition_date_labels = [
+
str(pendulum.instance(x.partition_date).in_timezone("Asia/Taipei").date()) for
x in dag_runs
+ ]
+ # Full-sequence equality: all three days, nothing more, nothing less.
+ assert partition_date_labels == ["2026-02-18", "2026-02-19", "2026-02-20"]
+ # Behavioral change: for offset != 0, run_after == partition_date (not the
offset cron tick).
+ for dr in dag_runs:
+ assert dr.run_after == dr.partition_date, (
+ f"run_offset={run_offset}: expected run_after == partition_date, "
+ f"got run_after={dr.run_after!r},
partition_date={dr.partition_date!r}"
+ )
+
+
[email protected]_test
[email protected](
+ ("run_offset", "from_date", "to_date", "expected_labels"),
+ [
+ # at-cap: single-day window [2026-02-18, 2026-02-18] must produce
exactly 1 run and
+ # must NOT include the adjacent day (2026-02-17 or 2026-02-19).
+ pytest.param(1, "2026-02-18", "2026-02-18", ["2026-02-18"],
id="offset_1_at_cap"),
+ pytest.param(-1, "2026-02-18", "2026-02-18", ["2026-02-18"],
id="offset_minus1_at_cap"),
+ # neighbor-pair: window [2026-02-18, 2026-02-19] — full-sequence
equality proves
+ # the enumeration produces both days and stops exactly at the boundary.
+ pytest.param(1, "2026-02-18", "2026-02-19", ["2026-02-18",
"2026-02-19"], id="offset_1_two_day_pair"),
+ pytest.param(
+ -1, "2026-02-18", "2026-02-19", ["2026-02-18", "2026-02-19"],
id="offset_minus1_two_day_pair"
+ ),
+ ],
+)
+def test_backfill_partitioned_nonzero_offset_at_cap_single_day(
+ run_offset, from_date, to_date, expected_labels, dag_maker, session
+):
+ """Non-zero run_offset: partition enumeration produces the exact requested
window (auto-detected).
+
+ at-cap cases ([2026-02-18, 2026-02-18]) prove exactly one run is produced
and
+ adjacent days are excluded. neighbor-pair cases ([2026-02-18,
2026-02-19]) prove
+ the enumeration includes both days and stops exactly at the upper boundary.
+ Together they pin the ``>`` vs ``>=`` boundary: at-cap is allowed, one
step over
+ adds the next day without dropping the cap.
+
+ Each parametrize case is an independent test run (separate DB state via
autouse fixture),
+ so there is no AlreadyRunningBackfill conflict between cases.
+ """
+ with dag_maker(
+ schedule=CronPartitionTimetable("0 0 * * *", timezone="Asia/Taipei",
run_offset=run_offset)
+ ) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse(from_date),
+ to_date=pendulum.parse(to_date),
+ max_active_runs=2,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+ query = (
+ select(DagRun)
+ .join(BackfillDagRun.dag_run)
+ .where(BackfillDagRun.backfill_id == b.id)
+ .order_by(BackfillDagRun.sort_ordinal)
+ )
+ labels = [
+
str(pendulum.instance(x.partition_date).in_timezone("Asia/Taipei").date())
+ for x in session.scalars(query).all()
+ ]
+ assert labels == expected_labels
+
+
[email protected]_test
+def test_backfill_partitioned_offset_zero_behavior_unchanged(dag_maker,
session):
+ """offset==0: direct partition-axis enumeration produces the same result
as the previous implementation.
+
+ Regression guard that ensures offset==0 results are unchanged after the
refactor to
+ direct partition enumeration via timetable.iter_partition_dagrun_infos.
+ """
+ with dag_maker(schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei", run_offset=0)) as dag:
+ PythonOperator(task_id="hi", python_callable=print)
+ session.commit()
+
+ b = _create_backfill(
+ dag_id=dag.dag_id,
+ from_date=pendulum.parse("2026-02-18"),
+ to_date=pendulum.parse("2026-02-20"),
+ max_active_runs=5,
+ reverse=False,
+ triggering_user_name="pytest",
+ dag_run_conf={},
+ )
+ query = (
+ select(DagRun)
+ .join(BackfillDagRun.dag_run)
+ .where(BackfillDagRun.backfill_id == b.id)
+ .order_by(BackfillDagRun.sort_ordinal)
+ )
+ dag_runs = session.scalars(query).all()
+ partition_date_labels = [
+
str(pendulum.instance(x.partition_date).in_timezone("Asia/Taipei").date()) for
x in dag_runs
+ ]
+ assert partition_date_labels == ["2026-02-18", "2026-02-19", "2026-02-20"]
diff --git a/airflow-core/tests/unit/models/test_dag.py
b/airflow-core/tests/unit/models/test_dag.py
index 965b3be8898..73628b9919c 100644
--- a/airflow-core/tests/unit/models/test_dag.py
+++ b/airflow-core/tests/unit/models/test_dag.py
@@ -220,8 +220,8 @@ class TestDag:
assert dr is not None
# Serialized DAG should now exist and DagRun would be created
- ser = DBDagBag().get_latest_version_of_dag(dag_id, session=session)
- assert ser is not None
+ set = DBDagBag().get_latest_version_of_dag(dag_id, session=session)
+ assert set is not None
assert session.scalar(select(DagRun).where(DagRun.dag_id == dag_id))
is not None
@conf_vars({("core", "load_examples"): "false"})
@@ -3401,6 +3401,49 @@ def test_iter_dagrun_infos_between(start_date,
expected_infos):
assert expected_infos == list(iterator)
+def test_iter_dagrun_infos_between_partitioned_timetable():
+ """iter_dagrun_infos_between dispatches to iter_partition_dagrun_infos for
partitioned timetables.
+
+ Verifies:
+ - Full-sequence equality: result matches direct
iter_partition_dagrun_infos call.
+ - to_date's calendar date is inclusive (half-open +1day upper bound makes
it so).
+ - to_date+1 is exclusive (partition tick for the day after to_date is
absent).
+ """
+ dag = DAG(
+ dag_id="test_iter_dagrun_infos_between_partitioned",
+ start_date=DEFAULT_DATE,
+ schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
+ )
+ EmptyOperator(task_id="dummy", dag=dag)
+ scheduler_dag = create_scheduler_dag(dag)
+
+ # Window: 3-day span so we get a non-trivial sequence to compare.
+ from_dt = pendulum.datetime(2026, 3, 10, tz="UTC")
+ to_dt = pendulum.datetime(2026, 3, 12, 23, 59, 59, tz="UTC")
+
+ result = list(scheduler_dag.iter_dagrun_infos_between(earliest=from_dt,
latest=to_dt))
+
+ # Build expected sequence by calling iter_partition_dagrun_infos directly
with the same day-bounds.
+ core_timetable = scheduler_dag.timetable
+ expected = list(
+ core_timetable.iter_partition_dagrun_infos(
+ earliest_date=from_dt.date(),
+ latest_date=to_dt.date(),
+ )
+ )
+
+ # Full-sequence equality — not head/tail/length.
+ assert result == expected
+
+ # to_date's partition (2026-03-12) must be present (inclusive).
+ to_date_partition = pendulum.datetime(2026, 3, 12, tz="UTC")
+ assert any(info.run_after == to_date_partition for info in result)
+
+ # The partition for the day after to_date (2026-03-13) must be absent
(half-open upper bound).
+ after_to_date_partition = pendulum.datetime(2026, 3, 13, tz="UTC")
+ assert not any(info.run_after == after_to_date_partition for info in
result)
+
+
def test_iter_dagrun_infos_between_error(caplog):
start = pendulum.instance(DEFAULT_DATE - datetime.timedelta(hours=1))
end = pendulum.instance(DEFAULT_DATE)
diff --git a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
index ca6ada9dfb2..8df3b40a6d5 100644
--- a/airflow-core/tests/unit/timetables/test_trigger_timetable.py
+++ b/airflow-core/tests/unit/timetables/test_trigger_timetable.py
@@ -348,6 +348,30 @@ def test_cron_trigger_next_info_with_interval():
)
+def test_partition_key_uses_timetable_timezone():
+ """Regression: partition_key reflects the local partition date, not the
UTC instant.
+
+ For an Asia/Taipei daily timetable the 2026-02-15 partition fires at Taipei
+ midnight = 2026-02-14T16:00:00Z. partition_date stays that UTC instant,
but
+ partition_key must read as the local label "2026-02-15T00:00:00" rather
than
+ the UTC "2026-02-14T16:00:00".
+ """
+ timetable = CoreCronPartitionTimetable("0 0 * * *", timezone="Asia/Taipei")
+ info = timetable.next_dagrun_info_v2(
+ last_dagrun_info=None,
+ restriction=TimeRestriction(
+ earliest=pendulum.datetime(2026, 2, 14, 12, tz="Asia/Taipei"),
+ latest=None,
+ catchup=True,
+ ),
+ )
+ assert info is not None
+ # Key formatted in the timetable timezone (the local calendar label).
+ assert info.partition_key == "2026-02-15T00:00:00"
+ # partition_date stays the UTC instant of Taipei midnight — only the key
formatting changed.
+ assert info.partition_date == pendulum.datetime(2026, 2, 14, 16, 0, 0,
tz="UTC")
+
+
@pytest.mark.parametrize(
"timetable",
[
@@ -850,3 +874,217 @@ def test_dagruninfo_backward_compatibility() -> None:
assert info.partition_date is None
assert info.partition_key is None
+
+
[email protected](
+ ("run_offset", "earliest_date", "latest_date", "expected_triples"),
+ [
+ pytest.param(
+ 0,
+ # "0 0 * * *" UTC+8 (Asia/Taipei): ticks at 16:00 UTC each day.
+ # earliest_date=2026-02-18 (local label) → UTC bound
2026-02-17T16:00Z
+ # latest_date=2026-02-20 (inclusive) → UTC upper bound
2026-02-20T16:00Z
+ datetime.date(2026, 2, 18),
+ datetime.date(2026, 2, 20),
+ [
+ (
+ pendulum.datetime(2026, 2, 17, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 17, 16, tz="UTC"),
+ "2026-02-18T00:00:00",
+ ),
+ (
+ pendulum.datetime(2026, 2, 18, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 18, 16, tz="UTC"),
+ "2026-02-19T00:00:00",
+ ),
+ (
+ pendulum.datetime(2026, 2, 19, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 19, 16, tz="UTC"),
+ "2026-02-20T00:00:00",
+ ),
+ ],
+ id="offset_0_taipei_three_days",
+ ),
+ pytest.param(
+ 1,
+ # offset=1: run_after = partition_date (same axis in new design)
+ # same three days as offset=0.
+ datetime.date(2026, 2, 18),
+ datetime.date(2026, 2, 20),
+ [
+ (
+ pendulum.datetime(2026, 2, 17, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 17, 16, tz="UTC"),
+ "2026-02-18T00:00:00",
+ ),
+ (
+ pendulum.datetime(2026, 2, 18, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 18, 16, tz="UTC"),
+ "2026-02-19T00:00:00",
+ ),
+ (
+ pendulum.datetime(2026, 2, 19, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 19, 16, tz="UTC"),
+ "2026-02-20T00:00:00",
+ ),
+ ],
+ id="offset_plus1_run_after_equals_partition_date",
+ ),
+ pytest.param(
+ -1,
+ # offset=-1: run_after = partition_date (same axis in new design)
+ datetime.date(2026, 2, 18),
+ datetime.date(2026, 2, 20),
+ [
+ (
+ pendulum.datetime(2026, 2, 17, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 17, 16, tz="UTC"),
+ "2026-02-18T00:00:00",
+ ),
+ (
+ pendulum.datetime(2026, 2, 18, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 18, 16, tz="UTC"),
+ "2026-02-19T00:00:00",
+ ),
+ (
+ pendulum.datetime(2026, 2, 19, 16, tz="UTC"),
+ pendulum.datetime(2026, 2, 19, 16, tz="UTC"),
+ "2026-02-20T00:00:00",
+ ),
+ ],
+ id="offset_minus1_run_after_equals_partition_date",
+ ),
+ ],
+)
+def test_iter_partition_dagrun_infos_full_sequence(
+ run_offset, earliest_date, latest_date, expected_triples
+) -> None:
+ """iter_partition_dagrun_infos yields (partition_date, run_after,
partition_key) full sequence.
+
+ Key behavioral invariant: run_after == partition_date for all offsets
(including non-zero).
+ partition_key is formatted in local timezone (Asia/Taipei), so the UTC
tick at 16:00Z
+ formats as the next calendar day's midnight.
+ """
+ timetable = CoreCronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei", run_offset=run_offset)
+ infos =
list(timetable.iter_partition_dagrun_infos(earliest_date=earliest_date,
latest_date=latest_date))
+ actual = [(info.partition_date, info.run_after, info.partition_key) for
info in infos]
+ assert actual == expected_triples
+ # Invariants for every tick: run_after == partition_date (all offsets) and
data_interval is None.
+ for info in infos:
+ assert info.run_after == info.partition_date, (
+ f"run_after {info.run_after!r} != partition_date
{info.partition_date!r} "
+ f"for run_offset={run_offset}"
+ )
+ assert info.data_interval is None
+
+
+def test_iter_partition_dagrun_infos_empty_window() -> None:
+ """Empty window (earliest_date > latest_date) yields an empty sequence."""
+ timetable = CoreCronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei", run_offset=0)
+ infos = list(
+ timetable.iter_partition_dagrun_infos(
+ earliest_date=datetime.date(2026, 2, 20),
+ latest_date=datetime.date(2026, 2, 18),
+ )
+ )
+ assert infos == []
+
+
+def test_iter_partition_dagrun_infos_endpoint_not_on_tick() -> None:
+ """When the UTC bound from resolve_day_bound is not on a cron tick,
_align_to_next moves to the next tick.
+
+ "0 6 * * *" Asia/Taipei ticks at 06:00 local = 22:00Z (previous UTC day).
+ resolve_day_bound(2026-02-18) = 2026-02-17T16:00Z (local midnight), which
is NOT a tick.
+ _align_to_next(2026-02-17T16:00Z) moves to 2026-02-17T22:00Z (first 06:00
Taipei tick).
+ """
+ timetable = CoreCronPartitionTimetable("0 6 * * *",
timezone="Asia/Taipei", run_offset=0)
+ infos = list(
+ timetable.iter_partition_dagrun_infos(
+ earliest_date=datetime.date(2026, 2, 18),
+ latest_date=datetime.date(2026, 2, 18),
+ )
+ )
+ # Only one tick in the window: 2026-02-17T22:00Z (Taipei 2026-02-18T06:00).
+ assert len(infos) == 1
+ assert infos[0].partition_date == pendulum.datetime(2026, 2, 17, 22,
tz="UTC")
+ assert infos[0].partition_key == "2026-02-18T06:00:00"
+
+
+def test_iter_partition_dagrun_infos_inclusive_endpoint_pair() -> None:
+ """Both endpoints are inclusive calendar dates.
+
+ latest_date=2026-02-18: window covers local day 2026-02-18 only (one tick).
+ latest_date=2026-02-19: window extends to include 2026-02-19 (two ticks).
+ The tick for the day after latest_date is always absent.
+ """
+ timetable = CoreCronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei", run_offset=0)
+
+ infos_one = list(
+ timetable.iter_partition_dagrun_infos(
+ earliest_date=datetime.date(2026, 2, 18),
+ latest_date=datetime.date(2026, 2, 18),
+ )
+ )
+ assert len(infos_one) == 1
+ assert infos_one[0].partition_key == "2026-02-18T00:00:00"
+
+ # Extending latest_date by one calendar day adds exactly one more tick.
+ infos_two = list(
+ timetable.iter_partition_dagrun_infos(
+ earliest_date=datetime.date(2026, 2, 18),
+ latest_date=datetime.date(2026, 2, 19),
+ )
+ )
+ assert len(infos_two) == 2
+ assert [i.partition_key for i in infos_two] == ["2026-02-18T00:00:00",
"2026-02-19T00:00:00"]
+
+ # The tick for 2026-02-20 (the day after latest_date) is NOT included.
+ assert not any(i.partition_key == "2026-02-20T00:00:00" for i in infos_two)
+
+
+def test_iter_partition_dagrun_infos_taipei_local_midnight_vs_utc_day() ->
None:
+ """Asia/Taipei midnight partitions are stored as the prior UTC day.
+
+ Taipei (UTC+8) midnight 2026-02-18 00:00 local = 2026-02-17T16:00Z.
+ iter_partition_dagrun_infos must yield partition_date=2026-02-17T16:00Z and
+ partition_key="2026-02-18T00:00:00" (local label), demonstrating that the
UTC
+ instant and the local key correctly reflect the timezone relationship.
+ """
+ timetable = CoreCronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei", run_offset=0)
+ # Single calendar-day window for 2026-02-18 (local Taipei date).
+ infos = list(
+ timetable.iter_partition_dagrun_infos(
+ earliest_date=datetime.date(2026, 2, 18),
+ latest_date=datetime.date(2026, 2, 18),
+ )
+ )
+ assert len(infos) == 1
+ info = infos[0]
+ assert info.partition_date == pendulum.datetime(2026, 2, 17, 16, tz="UTC")
+ assert info.run_after == pendulum.datetime(2026, 2, 17, 16, tz="UTC")
+ assert info.partition_key == "2026-02-18T00:00:00"
+
+
+def test_iter_partition_dagrun_infos_dst_america_new_york_spring_forward() ->
None:
+ """America/New_York spring-forward (2026-03-08): no tick is lost or
duplicated.
+
+ Spring-forward 2026: clocks jump at 2026-03-08T02:00 ET (EST=UTC-5 →
EDT=UTC-4).
+ Midnight ET is unaffected by the spring-forward (it occurs at 02:00), so:
+ - 2026-03-07 midnight ET = 2026-03-07T05:00Z
+ - 2026-03-08 midnight ET = 2026-03-08T05:00Z (no gap at midnight)
+ Both ticks must appear in the iteration; no duplicates.
+ """
+ timetable = CoreCronPartitionTimetable("0 0 * * *",
timezone="America/New_York", run_offset=0)
+ infos = list(
+ timetable.iter_partition_dagrun_infos(
+ earliest_date=datetime.date(2026, 3, 7),
+ latest_date=datetime.date(2026, 3, 8),
+ )
+ )
+ partition_keys = [i.partition_key for i in infos]
+ assert partition_keys == ["2026-03-07T00:00:00", "2026-03-08T00:00:00"]
+ # No duplicates.
+ assert len(partition_keys) == len(set(partition_keys))
+ # run_after == partition_date for both ticks.
+ for info in infos:
+ assert info.run_after == info.partition_date