This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 6783f4f7483 Fix airflow partitions clear date range for non-UTC
partitioned timetables (#68460)
6783f4f7483 is described below
commit 6783f4f7483b009b3d243684c90dc5779ca2875a
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 15 19:59:21 2026 +0800
Fix airflow partitions clear date range for non-UTC partitioned timetables
(#68460)
---
.../src/airflow/cli/commands/partition_command.py | 24 ++-
.../unit/cli/commands/test_partition_command.py | 227 +++++++++++++++++----
2 files changed, 208 insertions(+), 43 deletions(-)
diff --git a/airflow-core/src/airflow/cli/commands/partition_command.py
b/airflow-core/src/airflow/cli/commands/partition_command.py
index 5c9111f4fdf..e55ba02f665 100644
--- a/airflow-core/src/airflow/cli/commands/partition_command.py
+++ b/airflow-core/src/airflow/cli/commands/partition_command.py
@@ -18,6 +18,7 @@
from __future__ import annotations
+import datetime
from typing import TYPE_CHECKING
from sqlalchemy import or_, select
@@ -26,6 +27,7 @@ from airflow._shared.timezones.timezone import parse as
parsedate
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance, clear_task_instances
from airflow.utils import cli as cli_utils
+from airflow.utils.cli import get_db_dag
from airflow.utils.providers_configuration_loader import
providers_configuration_loaded
from airflow.utils.session import NEW_SESSION, provide_session
@@ -68,7 +70,15 @@ def _flush_buffer(
@providers_configuration_loaded
@provide_session
def clear(args, *, session: Session = NEW_SESSION) -> None:
- """Clear the partition_key and partition_date of matching DagRuns."""
+ """
+ Clear the partition_key and partition_date of matching DagRuns.
+
+ When a partition_date window is given, both bounds are **day-granular** and
+ anchored in the timetable's timezone for tz-aware partitioned timetables.
+ --start-date is the inclusive start local calendar day; --end-date is the
+ inclusive end local calendar day (any time-of-day or timezone-offset
+ component in either value is ignored; only the calendar date is used).
+ """
has_range = args.start_date is not None or args.end_date is not None or
args.date is not None
selectors_used = sum([args.run_id is not None, args.partition_key is not
None, has_range])
if selectors_used != 1:
@@ -97,10 +107,14 @@ def clear(args, *, session: Session = NEW_SESSION) -> None:
stmt = stmt.where(DagRun.partition_key == args.partition_key)
else:
stmt = stmt.where(or_(DagRun.partition_key.is_not(None),
DagRun.partition_date.is_not(None)))
- if args.start_date is not None:
- stmt = stmt.where(DagRun.partition_date >= args.start_date)
- if args.end_date is not None:
- stmt = stmt.where(DagRun.partition_date <= args.end_date)
+ if args.start_date is not None or args.end_date is not None:
+ dag = get_db_dag(bundle_names=None, dag_id=args.dag_id)
+ if args.start_date is not None:
+ lower = dag.timetable.resolve_day_bound(args.start_date.date())
+ stmt = stmt.where(DagRun.partition_date >= lower)
+ if args.end_date is not None:
+ upper = dag.timetable.resolve_day_bound(args.end_date.date() +
datetime.timedelta(days=1))
+ stmt = stmt.where(DagRun.partition_date < upper)
stmt = stmt.order_by(DagRun.partition_date, DagRun.run_id)
clear_tis = bool(args.clear_task_instances)
diff --git a/airflow-core/tests/unit/cli/commands/test_partition_command.py
b/airflow-core/tests/unit/cli/commands/test_partition_command.py
index 6548ef0aa84..27b999afa3c 100644
--- a/airflow-core/tests/unit/cli/commands/test_partition_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_partition_command.py
@@ -786,11 +786,12 @@ class TestPartitionsClear:
)
assert excinfo.value.code == "--date cannot be combined with
--start-date / --end-date."
- def test_date_range_end_date_is_literal(self, parser, dag_maker):
- """Pin literal <= semantics: --date right side is used as-is (no
end-of-day clamp).
+ def test_date_range_end_date_is_day_granular(self, parser, dag_maker):
+ """Pin day-granular semantics: --date right side covers the whole
local calendar day.
- '2026-01-02' parses to midnight 2026-01-02T00:00:00, so a run at 15:00
on
- the same day has partition_date > the bound and must NOT be cleared.
+ '2026-01-02~2026-01-02' resolves to [Jan 2 00:00Z, Jan 3 00:00Z), so
both
+ the midnight run and a 15:00 run on the same local day are cleared.
+ The time-of-day component of the input is ignored; only the calendar
date is used.
"""
dag_maker.create_dagrun(
run_id="part_run_2_midday_date",
@@ -814,22 +815,27 @@ class TestPartitionsClear:
)
)
- # Midnight run on exact date is matched (partition_date == bound).
+ # Midnight run on the date is cleared.
run_2 = _get_run("part_run_2")
assert run_2.partition_key is None
assert run_2.partition_date is None
- # 15:00 > midnight bound — NOT cleared.
+ # 15:00 on the same local day is also cleared (whole-day upper bound
Jan 3 midnight).
run_midday = _get_run("part_run_2_midday_date")
- assert run_midday.partition_key == "2026-01-02T15:00:00"
- assert run_midday.partition_date == datetime(2026, 1, 2, 15, 0, 0,
tzinfo=pendulum.UTC)
- # Runs outside the range are untouched.
+ assert run_midday.partition_key is None
+ assert run_midday.partition_date is None
+ # Runs outside the date range are untouched.
run_1 = _get_run("part_run_1")
assert run_1.partition_key == "2026-01-01T00:00:00"
run_3 = _get_run("part_run_3")
assert run_3.partition_key == "2026-01-03T00:00:00"
- def test_clear_with_datetime_end_date_no_clamp(self, parser, dag_maker):
- """Pin literal <= semantics with a datetime endpoint: runs after the
bound are excluded."""
+ def test_clear_datetime_input_time_component_ignored(self, parser,
dag_maker):
+ """Time-of-day component in --start-date / --end-date is stripped;
only the date is used.
+
+ --end-date 2026-01-02T10:00:00 strips to Jan 2, making the half-open
upper
+ bound Jan 3 00:00Z. Both the 10:00 and 15:00 runs on Jan 2 fall within
+ [start, Jan 3 00:00Z) and are cleared.
+ """
dag_maker.create_dagrun(
run_id="part_run_h10",
state=DagRunState.SUCCESS,
@@ -859,16 +865,22 @@ class TestPartitionsClear:
)
)
- # 10:00 is within the boundary (<=), must be cleared.
+ # 10:00 on Jan 2 is within [start, Jan 3 00:00Z) — cleared.
run_h10 = _get_run("part_run_h10")
assert run_h10.partition_key is None
assert run_h10.partition_date is None
- # 15:00 exceeds the un-clamped 10:00 boundary, must be untouched.
+ # 15:00 on Jan 2 is also within the half-open day bound — cleared.
run_h15 = _get_run("part_run_h15")
- assert run_h15.partition_key == "2026-01-02T15:00:00"
+ assert run_h15.partition_key is None
+ assert run_h15.partition_date is None
+
+ def test_clear_datetime_inputs_use_date_part_only(self, parser, dag_maker):
+ """Datetime --start-date / --end-date inputs are day-granular; the
time part is ignored.
- def test_clear_hourly_window_via_start_end_date(self, parser, dag_maker):
- """ISO datetime --start-date / --end-date selects only runs within the
exact window."""
+ --start-date 2026-01-02T03:00:00 strips to Jan 2 → lower = Jan 2
00:00Z.
+ --end-date 2026-01-02T10:00:00 strips to Jan 2 → upper = Jan 3
00:00Z.
+ All three runs on Jan 2 (02:00, 05:00, 11:00) fall within that window.
+ """
dag_maker.create_dagrun(
run_id="part_run_h02",
state=DagRunState.SUCCESS,
@@ -907,19 +919,14 @@ class TestPartitionsClear:
)
)
- # 02:00 is before the window start — untouched.
- run_h02 = _get_run("part_run_h02")
- assert run_h02.partition_key == "2026-01-02T02:00:00"
- # 05:00 is inside [03:00, 10:00] — cleared.
- run_h05 = _get_run("part_run_h05")
- assert run_h05.partition_key is None
- assert run_h05.partition_date is None
- # 11:00 is after the window end — untouched.
- run_h11b = _get_run("part_run_h11b")
- assert run_h11b.partition_key == "2026-01-02T11:00:00"
+ # All three runs on Jan 2 fall within [Jan 2 00:00Z, Jan 3 00:00Z) —
cleared.
+ for run_id in ("part_run_h02", "part_run_h05", "part_run_h11b"):
+ run = _get_run(run_id)
+ assert run.partition_key is None
+ assert run.partition_date is None
- def test_clear_via_date_range_with_datetime_endpoints(self, parser,
dag_maker):
- """--date with ISO datetime endpoints does not clamp the right side."""
+ def test_clear_via_date_range_datetime_endpoints_use_date_part_only(self,
parser, dag_maker):
+ """--date with ISO datetime endpoints strips the time part; the full
local day is covered."""
dag_maker.create_dagrun(
run_id="part_run_h02b",
state=DagRunState.SUCCESS,
@@ -943,6 +950,7 @@ class TestPartitionsClear:
)
dag_maker.sync_dagbag_to_db()
+ # Both sides strip to Jan 2 → window is [Jan 2 00:00Z, Jan 3 00:00Z).
partition_command.clear(
parser.parse_args(
[
@@ -956,13 +964,156 @@ class TestPartitionsClear:
)
)
- # 02:00 is before window start — untouched.
- run_h02b = _get_run("part_run_h02b")
- assert run_h02b.partition_key == "2026-01-02T02:00:00"
- # 05:00 is inside [03:00, 10:00] — cleared.
- run_h05b = _get_run("part_run_h05b")
- assert run_h05b.partition_key is None
- assert run_h05b.partition_date is None
- # 11:00 is after un-clamped 10:00 end — untouched.
- run_h11c = _get_run("part_run_h11c")
- assert run_h11c.partition_key == "2026-01-02T11:00:00"
+ # All three runs on Jan 2 are within the day window — cleared.
+ for run_id in ("part_run_h02b", "part_run_h05b", "part_run_h11c"):
+ run = _get_run(run_id)
+ assert run.partition_key is None
+ assert run.partition_date is None
+
+ TAIPEI_DAG_ID = "test_partitions_clear_taipei_dag"
+
+ @pytest.fixture
+ def seeded_taipei_runs(self, dag_maker, setup_partitioned_runs):
+ """Seed DagRuns for a Asia/Taipei (UTC+8) CronPartitionTimetable.
+
+ Depends on ``setup_partitioned_runs`` (the class-level fixture) so
that its
+ ``clear_db_runs()`` runs before — not after — the Taipei runs are
seeded.
+
+ Local midnight in Taipei is stored as UTC-8h in partition_date.
+ Written as explicit UTC instants so the oracle is independent of the
+ timetable under test:
+
+ local 2026-02-18 midnight → datetime(2026, 2, 17, 16, 0, 0, UTC)
+ local 2026-02-19 midnight → datetime(2026, 2, 18, 16, 0, 0, UTC)
+ local 2026-02-20 midnight → datetime(2026, 2, 19, 16, 0, 0, UTC)
(outside window)
+ """
+ clear_db_runs()
+ clear_db_dags()
+ with dag_maker(
+ self.TAIPEI_DAG_ID,
+ schedule=CronPartitionTimetable("0 0 * * *",
timezone="Asia/Taipei"),
+ start_date=datetime(2026, 2, 1, tzinfo=pendulum.UTC),
+ catchup=True,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t1")
+ runs = [
+ (
+ "taipei_2026_02_18",
+ datetime(2026, 2, 17, 16, 0, 0, tzinfo=pendulum.UTC),
+ "2026-02-18T00:00:00",
+ ),
+ (
+ "taipei_2026_02_19",
+ datetime(2026, 2, 18, 16, 0, 0, tzinfo=pendulum.UTC),
+ "2026-02-19T00:00:00",
+ ),
+ (
+ "taipei_2026_02_20",
+ datetime(2026, 2, 19, 16, 0, 0, tzinfo=pendulum.UTC),
+ "2026-02-20T00:00:00",
+ ),
+ ]
+ for run_id, partition_date, partition_key in runs:
+ dag_maker.create_dagrun(
+ run_id=run_id,
+ state=DagRunState.SUCCESS,
+ logical_date=None,
+ partition_date=partition_date,
+ partition_key=partition_key,
+ )
+ dag_maker.sync_dagbag_to_db()
+ yield
+ clear_db_runs()
+ clear_db_dags()
+
+ def _get_taipei_run_partition_dates(self) -> dict[str, datetime | None]:
+ with create_session() as session:
+ runs = session.scalars(select(DagRun).where(DagRun.dag_id ==
self.TAIPEI_DAG_ID)).all()
+ return {r.run_id: r.partition_date for r in runs}
+
+ @pytest.mark.usefixtures("seeded_taipei_runs")
+ def test_taipei_lower_bound_selects_correct_partition(self, parser):
+ """--start-date 2026-02-19 must match the run stored at 2026-02-18T16Z.
+
+ Without the timezone fix, parsedate("2026-02-19") yields
2026-02-19T00:00:00Z
+ under the UTC default timezone. The old filter compared
+ partition_date >= 2026-02-19T00:00Z; the run for local 2026-02-19 is
stored
+ as 2026-02-18T16:00Z, which is *before* that UTC boundary, so the run
would
+ be missed. Resolving through the timetable timezone fixes the
off-by-one:
+ the run at 2026-02-18T16Z is selected, the earlier run is not.
+ """
+ partition_command.clear(
+ parser.parse_args(
+ [
+ "partitions",
+ "clear",
+ "--dag-id",
+ self.TAIPEI_DAG_ID,
+ "--start-date",
+ "2026-02-19",
+ ]
+ )
+ )
+
+ dates = self._get_taipei_run_partition_dates()
+ # 2026-02-19 local midnight stored as 2026-02-18T16Z — must be cleared
(partition_date is None).
+ assert dates["taipei_2026_02_19"] is None
+ # 2026-02-20 local midnight stored as 2026-02-19T16Z — also in window
(no upper bound).
+ assert dates["taipei_2026_02_20"] is None
+ # 2026-02-18 local midnight stored as 2026-02-17T16Z — before the
start, must NOT be cleared.
+ assert dates["taipei_2026_02_18"] == datetime(2026, 2, 17, 16, 0, 0,
tzinfo=pendulum.UTC)
+
+ @pytest.mark.usefixtures("seeded_taipei_runs")
+ def test_taipei_upper_bound_at_cap(self, parser):
+ """--end-date 2026-02-19 must include the run stored at 2026-02-18T16Z
(at-cap).
+
+ The half-open upper bound is 2026-02-20 midnight Taipei =
2026-02-19T16Z, so
+ the run for local 2026-02-19 (stored at 2026-02-18T16Z) falls within
the window.
+ """
+ partition_command.clear(
+ parser.parse_args(
+ [
+ "partitions",
+ "clear",
+ "--dag-id",
+ self.TAIPEI_DAG_ID,
+ "--end-date",
+ "2026-02-19",
+ ]
+ )
+ )
+
+ dates = self._get_taipei_run_partition_dates()
+ # Both 2026-02-18 and 2026-02-19 local dates are within [start, Feb 20
16Z).
+ assert dates["taipei_2026_02_18"] is None
+ assert dates["taipei_2026_02_19"] is None
+ # 2026-02-20 local midnight (stored at 2026-02-19T16Z) equals the
upper bound — NOT cleared.
+ assert dates["taipei_2026_02_20"] == datetime(2026, 2, 19, 16, 0, 0,
tzinfo=pendulum.UTC)
+
+ @pytest.mark.usefixtures("seeded_taipei_runs")
+ def test_taipei_upper_bound_over_cap(self, parser):
+ """--end-date 2026-02-18 must NOT include the run stored at
2026-02-18T16Z (over-cap).
+
+ The half-open upper bound is 2026-02-19 midnight Taipei =
2026-02-18T16Z, so
+ the run for local 2026-02-19 (stored at exactly that UTC instant)
falls outside.
+ """
+ partition_command.clear(
+ parser.parse_args(
+ [
+ "partitions",
+ "clear",
+ "--dag-id",
+ self.TAIPEI_DAG_ID,
+ "--end-date",
+ "2026-02-18",
+ ]
+ )
+ )
+
+ dates = self._get_taipei_run_partition_dates()
+ # Only the 2026-02-18 local date run is within the window.
+ assert dates["taipei_2026_02_18"] is None
+ # 2026-02-19 (stored at 2026-02-18T16Z) equals the upper bound —
strictly less than, NOT cleared.
+ assert dates["taipei_2026_02_19"] == datetime(2026, 2, 18, 16, 0, 0,
tzinfo=pendulum.UTC)
+ assert dates["taipei_2026_02_20"] == datetime(2026, 2, 19, 16, 0, 0,
tzinfo=pendulum.UTC)