This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6783f4f7483 Fix airflow partitions clear date range for non-UTC 
partitioned timetables (#68460)
6783f4f7483 is described below

commit 6783f4f7483b009b3d243684c90dc5779ca2875a
Author: Wei Lee <[email protected]>
AuthorDate: Mon Jun 15 19:59:21 2026 +0800

    Fix airflow partitions clear date range for non-UTC partitioned timetables 
(#68460)
---
 .../src/airflow/cli/commands/partition_command.py  |  24 ++-
 .../unit/cli/commands/test_partition_command.py    | 227 +++++++++++++++++----
 2 files changed, 208 insertions(+), 43 deletions(-)

diff --git a/airflow-core/src/airflow/cli/commands/partition_command.py 
b/airflow-core/src/airflow/cli/commands/partition_command.py
index 5c9111f4fdf..e55ba02f665 100644
--- a/airflow-core/src/airflow/cli/commands/partition_command.py
+++ b/airflow-core/src/airflow/cli/commands/partition_command.py
@@ -18,6 +18,7 @@
 
 from __future__ import annotations
 
+import datetime
 from typing import TYPE_CHECKING
 
 from sqlalchemy import or_, select
@@ -26,6 +27,7 @@ from airflow._shared.timezones.timezone import parse as 
parsedate
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance, clear_task_instances
 from airflow.utils import cli as cli_utils
+from airflow.utils.cli import get_db_dag
 from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
 from airflow.utils.session import NEW_SESSION, provide_session
 
@@ -68,7 +70,15 @@ def _flush_buffer(
 @providers_configuration_loaded
 @provide_session
 def clear(args, *, session: Session = NEW_SESSION) -> None:
-    """Clear the partition_key and partition_date of matching DagRuns."""
+    """
+    Clear the partition_key and partition_date of matching DagRuns.
+
+    When a partition_date window is given, both bounds are **day-granular** and
+    anchored in the timetable's timezone for tz-aware partitioned timetables.
+    --start-date is the inclusive start local calendar day; --end-date is the
+    inclusive end local calendar day (any time-of-day or timezone-offset
+    component in either value is ignored; only the calendar date is used).
+    """
     has_range = args.start_date is not None or args.end_date is not None or 
args.date is not None
     selectors_used = sum([args.run_id is not None, args.partition_key is not 
None, has_range])
     if selectors_used != 1:
@@ -97,10 +107,14 @@ def clear(args, *, session: Session = NEW_SESSION) -> None:
         stmt = stmt.where(DagRun.partition_key == args.partition_key)
     else:
         stmt = stmt.where(or_(DagRun.partition_key.is_not(None), 
DagRun.partition_date.is_not(None)))
-        if args.start_date is not None:
-            stmt = stmt.where(DagRun.partition_date >= args.start_date)
-        if args.end_date is not None:
-            stmt = stmt.where(DagRun.partition_date <= args.end_date)
+        if args.start_date is not None or args.end_date is not None:
+            dag = get_db_dag(bundle_names=None, dag_id=args.dag_id)
+            if args.start_date is not None:
+                lower = dag.timetable.resolve_day_bound(args.start_date.date())
+                stmt = stmt.where(DagRun.partition_date >= lower)
+            if args.end_date is not None:
+                upper = dag.timetable.resolve_day_bound(args.end_date.date() + 
datetime.timedelta(days=1))
+                stmt = stmt.where(DagRun.partition_date < upper)
     stmt = stmt.order_by(DagRun.partition_date, DagRun.run_id)
 
     clear_tis = bool(args.clear_task_instances)
diff --git a/airflow-core/tests/unit/cli/commands/test_partition_command.py 
b/airflow-core/tests/unit/cli/commands/test_partition_command.py
index 6548ef0aa84..27b999afa3c 100644
--- a/airflow-core/tests/unit/cli/commands/test_partition_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_partition_command.py
@@ -786,11 +786,12 @@ class TestPartitionsClear:
             )
         assert excinfo.value.code == "--date cannot be combined with 
--start-date / --end-date."
 
-    def test_date_range_end_date_is_literal(self, parser, dag_maker):
-        """Pin literal <= semantics: --date right side is used as-is (no 
end-of-day clamp).
+    def test_date_range_end_date_is_day_granular(self, parser, dag_maker):
+        """Pin day-granular semantics: --date right side covers the whole 
local calendar day.
 
-        '2026-01-02' parses to midnight 2026-01-02T00:00:00, so a run at 15:00 
on
-        the same day has partition_date > the bound and must NOT be cleared.
+        '2026-01-02~2026-01-02' resolves to [Jan 2 00:00Z, Jan 3 00:00Z), so 
both
+        the midnight run and a 15:00 run on the same local day are cleared.
+        The time-of-day component of the input is ignored; only the calendar 
date is used.
         """
         dag_maker.create_dagrun(
             run_id="part_run_2_midday_date",
@@ -814,22 +815,27 @@ class TestPartitionsClear:
             )
         )
 
-        # Midnight run on exact date is matched (partition_date == bound).
+        # Midnight run on the date is cleared.
         run_2 = _get_run("part_run_2")
         assert run_2.partition_key is None
         assert run_2.partition_date is None
-        # 15:00 > midnight bound — NOT cleared.
+        # 15:00 on the same local day is also cleared (whole-day upper bound 
Jan 3 midnight).
         run_midday = _get_run("part_run_2_midday_date")
-        assert run_midday.partition_key == "2026-01-02T15:00:00"
-        assert run_midday.partition_date == datetime(2026, 1, 2, 15, 0, 0, 
tzinfo=pendulum.UTC)
-        # Runs outside the range are untouched.
+        assert run_midday.partition_key is None
+        assert run_midday.partition_date is None
+        # Runs outside the date range are untouched.
         run_1 = _get_run("part_run_1")
         assert run_1.partition_key == "2026-01-01T00:00:00"
         run_3 = _get_run("part_run_3")
         assert run_3.partition_key == "2026-01-03T00:00:00"
 
-    def test_clear_with_datetime_end_date_no_clamp(self, parser, dag_maker):
-        """Pin literal <= semantics with a datetime endpoint: runs after the 
bound are excluded."""
+    def test_clear_datetime_input_time_component_ignored(self, parser, 
dag_maker):
+        """Time-of-day component in --start-date / --end-date is stripped; 
only the date is used.
+
+        --end-date 2026-01-02T10:00:00 strips to Jan 2, making the half-open 
upper
+        bound Jan 3 00:00Z.  Both the 10:00 and 15:00 runs on Jan 2 fall within
+        [start, Jan 3 00:00Z) and are cleared.
+        """
         dag_maker.create_dagrun(
             run_id="part_run_h10",
             state=DagRunState.SUCCESS,
@@ -859,16 +865,22 @@ class TestPartitionsClear:
             )
         )
 
-        # 10:00 is within the boundary (<=), must be cleared.
+        # 10:00 on Jan 2 is within [start, Jan 3 00:00Z) — cleared.
         run_h10 = _get_run("part_run_h10")
         assert run_h10.partition_key is None
         assert run_h10.partition_date is None
-        # 15:00 exceeds the un-clamped 10:00 boundary, must be untouched.
+        # 15:00 on Jan 2 is also within the half-open day bound — cleared.
         run_h15 = _get_run("part_run_h15")
-        assert run_h15.partition_key == "2026-01-02T15:00:00"
+        assert run_h15.partition_key is None
+        assert run_h15.partition_date is None
+
+    def test_clear_datetime_inputs_use_date_part_only(self, parser, dag_maker):
+        """Datetime --start-date / --end-date inputs are day-granular; the 
time part is ignored.
 
-    def test_clear_hourly_window_via_start_end_date(self, parser, dag_maker):
-        """ISO datetime --start-date / --end-date selects only runs within the 
exact window."""
+        --start-date 2026-01-02T03:00:00 strips to Jan 2 → lower = Jan 2 
00:00Z.
+        --end-date   2026-01-02T10:00:00 strips to Jan 2 → upper = Jan 3 
00:00Z.
+        All three runs on Jan 2 (02:00, 05:00, 11:00) fall within that window.
+        """
         dag_maker.create_dagrun(
             run_id="part_run_h02",
             state=DagRunState.SUCCESS,
@@ -907,19 +919,14 @@ class TestPartitionsClear:
             )
         )
 
-        # 02:00 is before the window start — untouched.
-        run_h02 = _get_run("part_run_h02")
-        assert run_h02.partition_key == "2026-01-02T02:00:00"
-        # 05:00 is inside [03:00, 10:00] — cleared.
-        run_h05 = _get_run("part_run_h05")
-        assert run_h05.partition_key is None
-        assert run_h05.partition_date is None
-        # 11:00 is after the window end — untouched.
-        run_h11b = _get_run("part_run_h11b")
-        assert run_h11b.partition_key == "2026-01-02T11:00:00"
+        # All three runs on Jan 2 fall within [Jan 2 00:00Z, Jan 3 00:00Z) — 
cleared.
+        for run_id in ("part_run_h02", "part_run_h05", "part_run_h11b"):
+            run = _get_run(run_id)
+            assert run.partition_key is None
+            assert run.partition_date is None
 
-    def test_clear_via_date_range_with_datetime_endpoints(self, parser, 
dag_maker):
-        """--date with ISO datetime endpoints does not clamp the right side."""
+    def test_clear_via_date_range_datetime_endpoints_use_date_part_only(self, 
parser, dag_maker):
+        """--date with ISO datetime endpoints strips the time part; the full 
local day is covered."""
         dag_maker.create_dagrun(
             run_id="part_run_h02b",
             state=DagRunState.SUCCESS,
@@ -943,6 +950,7 @@ class TestPartitionsClear:
         )
         dag_maker.sync_dagbag_to_db()
 
+        # Both sides strip to Jan 2 → window is [Jan 2 00:00Z, Jan 3 00:00Z).
         partition_command.clear(
             parser.parse_args(
                 [
@@ -956,13 +964,156 @@ class TestPartitionsClear:
             )
         )
 
-        # 02:00 is before window start — untouched.
-        run_h02b = _get_run("part_run_h02b")
-        assert run_h02b.partition_key == "2026-01-02T02:00:00"
-        # 05:00 is inside [03:00, 10:00] — cleared.
-        run_h05b = _get_run("part_run_h05b")
-        assert run_h05b.partition_key is None
-        assert run_h05b.partition_date is None
-        # 11:00 is after un-clamped 10:00 end — untouched.
-        run_h11c = _get_run("part_run_h11c")
-        assert run_h11c.partition_key == "2026-01-02T11:00:00"
+        # All three runs on Jan 2 are within the day window — cleared.
+        for run_id in ("part_run_h02b", "part_run_h05b", "part_run_h11c"):
+            run = _get_run(run_id)
+            assert run.partition_key is None
+            assert run.partition_date is None
+
+    TAIPEI_DAG_ID = "test_partitions_clear_taipei_dag"
+
+    @pytest.fixture
+    def seeded_taipei_runs(self, dag_maker, setup_partitioned_runs):
+        """Seed DagRuns for a Asia/Taipei (UTC+8) CronPartitionTimetable.
+
+        Depends on ``setup_partitioned_runs`` (the class-level fixture) so 
that its
+        ``clear_db_runs()`` runs before — not after — the Taipei runs are 
seeded.
+
+        Local midnight in Taipei is stored as UTC-8h in partition_date.
+        Written as explicit UTC instants so the oracle is independent of the
+        timetable under test:
+
+          local 2026-02-18 midnight  → datetime(2026, 2, 17, 16, 0, 0, UTC)
+          local 2026-02-19 midnight  → datetime(2026, 2, 18, 16, 0, 0, UTC)
+          local 2026-02-20 midnight  → datetime(2026, 2, 19, 16, 0, 0, UTC)  
(outside window)
+        """
+        clear_db_runs()
+        clear_db_dags()
+        with dag_maker(
+            self.TAIPEI_DAG_ID,
+            schedule=CronPartitionTimetable("0 0 * * *", 
timezone="Asia/Taipei"),
+            start_date=datetime(2026, 2, 1, tzinfo=pendulum.UTC),
+            catchup=True,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t1")
+        runs = [
+            (
+                "taipei_2026_02_18",
+                datetime(2026, 2, 17, 16, 0, 0, tzinfo=pendulum.UTC),
+                "2026-02-18T00:00:00",
+            ),
+            (
+                "taipei_2026_02_19",
+                datetime(2026, 2, 18, 16, 0, 0, tzinfo=pendulum.UTC),
+                "2026-02-19T00:00:00",
+            ),
+            (
+                "taipei_2026_02_20",
+                datetime(2026, 2, 19, 16, 0, 0, tzinfo=pendulum.UTC),
+                "2026-02-20T00:00:00",
+            ),
+        ]
+        for run_id, partition_date, partition_key in runs:
+            dag_maker.create_dagrun(
+                run_id=run_id,
+                state=DagRunState.SUCCESS,
+                logical_date=None,
+                partition_date=partition_date,
+                partition_key=partition_key,
+            )
+        dag_maker.sync_dagbag_to_db()
+        yield
+        clear_db_runs()
+        clear_db_dags()
+
+    def _get_taipei_run_partition_dates(self) -> dict[str, datetime | None]:
+        with create_session() as session:
+            runs = session.scalars(select(DagRun).where(DagRun.dag_id == 
self.TAIPEI_DAG_ID)).all()
+        return {r.run_id: r.partition_date for r in runs}
+
+    @pytest.mark.usefixtures("seeded_taipei_runs")
+    def test_taipei_lower_bound_selects_correct_partition(self, parser):
+        """--start-date 2026-02-19 must match the run stored at 2026-02-18T16Z.
+
+        Without the timezone fix, parsedate("2026-02-19") yields 
2026-02-19T00:00:00Z
+        under the UTC default timezone.  The old filter compared
+        partition_date >= 2026-02-19T00:00Z; the run for local 2026-02-19 is 
stored
+        as 2026-02-18T16:00Z, which is *before* that UTC boundary, so the run 
would
+        be missed.  Resolving through the timetable timezone fixes the 
off-by-one:
+        the run at 2026-02-18T16Z is selected, the earlier run is not.
+        """
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    self.TAIPEI_DAG_ID,
+                    "--start-date",
+                    "2026-02-19",
+                ]
+            )
+        )
+
+        dates = self._get_taipei_run_partition_dates()
+        # 2026-02-19 local midnight stored as 2026-02-18T16Z — must be cleared 
(partition_date is None).
+        assert dates["taipei_2026_02_19"] is None
+        # 2026-02-20 local midnight stored as 2026-02-19T16Z — also in window 
(no upper bound).
+        assert dates["taipei_2026_02_20"] is None
+        # 2026-02-18 local midnight stored as 2026-02-17T16Z — before the 
start, must NOT be cleared.
+        assert dates["taipei_2026_02_18"] == datetime(2026, 2, 17, 16, 0, 0, 
tzinfo=pendulum.UTC)
+
+    @pytest.mark.usefixtures("seeded_taipei_runs")
+    def test_taipei_upper_bound_at_cap(self, parser):
+        """--end-date 2026-02-19 must include the run stored at 2026-02-18T16Z 
(at-cap).
+
+        The half-open upper bound is 2026-02-20 midnight Taipei = 
2026-02-19T16Z, so
+        the run for local 2026-02-19 (stored at 2026-02-18T16Z) falls within 
the window.
+        """
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    self.TAIPEI_DAG_ID,
+                    "--end-date",
+                    "2026-02-19",
+                ]
+            )
+        )
+
+        dates = self._get_taipei_run_partition_dates()
+        # Both 2026-02-18 and 2026-02-19 local dates are within [start, Feb 20 
16Z).
+        assert dates["taipei_2026_02_18"] is None
+        assert dates["taipei_2026_02_19"] is None
+        # 2026-02-20 local midnight (stored at 2026-02-19T16Z) equals the 
upper bound — NOT cleared.
+        assert dates["taipei_2026_02_20"] == datetime(2026, 2, 19, 16, 0, 0, 
tzinfo=pendulum.UTC)
+
+    @pytest.mark.usefixtures("seeded_taipei_runs")
+    def test_taipei_upper_bound_over_cap(self, parser):
+        """--end-date 2026-02-18 must NOT include the run stored at 
2026-02-18T16Z (over-cap).
+
+        The half-open upper bound is 2026-02-19 midnight Taipei = 
2026-02-18T16Z, so
+        the run for local 2026-02-19 (stored at exactly that UTC instant) 
falls outside.
+        """
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    self.TAIPEI_DAG_ID,
+                    "--end-date",
+                    "2026-02-18",
+                ]
+            )
+        )
+
+        dates = self._get_taipei_run_partition_dates()
+        # Only the 2026-02-18 local date run is within the window.
+        assert dates["taipei_2026_02_18"] is None
+        # 2026-02-19 (stored at 2026-02-18T16Z) equals the upper bound — 
strictly less than, NOT cleared.
+        assert dates["taipei_2026_02_19"] == datetime(2026, 2, 18, 16, 0, 0, 
tzinfo=pendulum.UTC)
+        assert dates["taipei_2026_02_20"] == datetime(2026, 2, 19, 16, 0, 0, 
tzinfo=pendulum.UTC)

Reply via email to