This is an automated email from the ASF dual-hosted git repository.

Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 07859aaf32f feat(cli): add `partitions clear` to reset DagRun 
partition fields (#66520)
07859aaf32f is described below

commit 07859aaf32ff23e244983a6f1b8bacb043f8f528
Author: Wei Lee <[email protected]>
AuthorDate: Wed May 27 19:54:10 2026 +0800

    feat(cli): add `partitions clear` to reset DagRun partition fields (#66520)
---
 airflow-core/src/airflow/cli/cli_config.py         |  73 ++
 .../src/airflow/cli/commands/partition_command.py  | 171 ++++
 .../unit/cli/commands/test_partition_command.py    | 968 +++++++++++++++++++++
 3 files changed, 1212 insertions(+)

diff --git a/airflow-core/src/airflow/cli/cli_config.py 
b/airflow-core/src/airflow/cli/cli_config.py
index 8e701db79bb..e9fd5e613a0 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -1048,6 +1048,49 @@ ARG_ASSET_NAME = Arg(("--name",), default="", 
help="Asset name")
 ARG_ASSET_URI = Arg(("--uri",), default="", help="Asset URI")
 ARG_ASSET_ALIAS = Arg(("--alias",), default=False, action="store_true", 
help="Show asset alias")
 
+# partitions clear
+ARG_PARTITIONS_CLEAR_DAG_ID = Arg(
+    ("-d", "--dag-id"),
+    help="The id of the Dag whose DagRun partition fields should be cleared",
+    required=True,
+)
+ARG_PARTITIONS_CLEAR_START_DATE = Arg(
+    ("-s", "--start-date"),
+    help="Inclusive lower bound of the partition_date window.",
+    type=parsedate,
+)
+ARG_PARTITIONS_CLEAR_END_DATE = Arg(
+    ("-e", "--end-date"),
+    help="Inclusive upper bound of the partition_date window.",
+    type=parsedate,
+)
+ARG_PARTITIONS_CLEAR_DRY_RUN = Arg(
+    ("--dry-run",),
+    help="Show which DagRuns would be cleared without modifying the database",
+    action="store_true",
+)
+ARG_PARTITIONS_CLEAR_PARTITION_KEY = Arg(
+    ("-k", "--partition-key"),
+    type=str,
+    help="Only clear DagRuns whose `partition_key` equals this exact value",
+)
+ARG_PARTITIONS_CLEAR_DATE_RANGE = Arg(
+    ("--date",),
+    type=str,
+    help=(
+        "Range expressed as 'a~b' (e.g. '2026-01-01~2026-01-31'); equivalent 
to "
+        "--start-date a --end-date b. Mutually exclusive with --start-date / 
--end-date."
+    ),
+)
+ARG_PARTITIONS_CLEAR_TASK_INSTANCES = Arg(
+    ("--clear-task-instances",),
+    help=(
+        "Also clear the matching DagRuns' task instances (resetting finished 
runs back to "
+        "QUEUED so they re-execute), in addition to clearing the partition 
fields."
+    ),
+    action="store_true",
+)
+
 ALTERNATIVE_CONN_SPECS_ARGS = [
     ARG_CONN_TYPE,
     ARG_CONN_DESCRIPTION,
@@ -1469,6 +1512,31 @@ TASKS_COMMANDS = (
         args=(ARG_DAG_ID, ARG_LOGICAL_DATE_OR_RUN_ID, ARG_OUTPUT, ARG_VERBOSE),
     ),
 )
+PARTITIONS_COMMANDS = (
+    ActionCommand(
+        name="clear",
+        help="Clear the partition_key and partition_date of one or more 
DagRuns",
+        description=(
+            "Clear the partition_key and partition_date columns on a Dag's 
DagRuns.\n"
+            "Either --run-id (single run), --partition-key (exact match), or a 
partition_date range "
+            "(--start-date/--end-date or --date a~b) is required.\n"
+            "Pass --clear-task-instances to additionally clear the matching 
DagRuns' "
+            "task instances so finished runs go back to QUEUED and re-execute."
+        ),
+        func=lazy_load_command("airflow.cli.commands.partition_command.clear"),
+        args=(
+            ARG_PARTITIONS_CLEAR_DAG_ID,
+            ARG_RUN_ID,
+            ARG_PARTITIONS_CLEAR_START_DATE,
+            ARG_PARTITIONS_CLEAR_END_DATE,
+            ARG_PARTITIONS_CLEAR_PARTITION_KEY,
+            ARG_PARTITIONS_CLEAR_DATE_RANGE,
+            ARG_PARTITIONS_CLEAR_TASK_INSTANCES,
+            ARG_PARTITIONS_CLEAR_DRY_RUN,
+            ARG_VERBOSE,
+        ),
+    ),
+)
 POOLS_COMMANDS = (
     ActionCommand(
         name="list",
@@ -2031,6 +2099,11 @@ core_commands: list[CLICommand] = [
         help="Manage backfills",
         subcommands=BACKFILL_COMMANDS,
     ),
+    GroupCommand(
+        name="partitions",
+        help="Manage Dag run partition metadata",
+        subcommands=PARTITIONS_COMMANDS,
+    ),
     GroupCommand(
         name="tasks",
         help="Manage tasks",
diff --git a/airflow-core/src/airflow/cli/commands/partition_command.py 
b/airflow-core/src/airflow/cli/commands/partition_command.py
new file mode 100644
index 00000000000..5c9111f4fdf
--- /dev/null
+++ b/airflow-core/src/airflow/cli/commands/partition_command.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+"""Partitions sub-commands."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from sqlalchemy import or_, select
+
+from airflow._shared.timezones.timezone import parse as parsedate
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance, clear_task_instances
+from airflow.utils import cli as cli_utils
+from airflow.utils.providers_configuration_loader import 
providers_configuration_loaded
+from airflow.utils.session import NEW_SESSION, provide_session
+
+if TYPE_CHECKING:
+    from sqlalchemy.orm import Session
+
+TI_CHUNK_SIZE = 500
+
+
+def _flush_buffer(
+    buffer: list[str],
+    carry: list[TaskInstance],
+    session: Session,
+    *,
+    drain: bool = False,
+) -> int:
+    """
+    Fetch TIs for buffered run_ids, extend carry, send full TI_CHUNK_SIZE 
slices.
+
+    If drain=True, also send the final partial slice (used at end of run).
+    Returns the total number of TIs sent to clear_task_instances by this call.
+    """
+    flushed = 0
+    if buffer:
+        chunk_tis = 
list(session.scalars(select(TaskInstance).where(TaskInstance.run_id.in_(buffer))))
+        buffer.clear()
+        carry.extend(chunk_tis)
+    while len(carry) >= TI_CHUNK_SIZE:
+        slice_tis = carry[:TI_CHUNK_SIZE]
+        del carry[:TI_CHUNK_SIZE]
+        clear_task_instances(slice_tis, session=session)
+        flushed += len(slice_tis)
+    if drain and carry:
+        clear_task_instances(carry, session=session)
+        flushed += len(carry)
+    return flushed
+
+
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def clear(args, *, session: Session = NEW_SESSION) -> None:
+    """Clear the partition_key and partition_date of matching DagRuns."""
+    has_range = args.start_date is not None or args.end_date is not None or 
args.date is not None
+    selectors_used = sum([args.run_id is not None, args.partition_key is not 
None, has_range])
+    if selectors_used != 1:
+        raise SystemExit(
+            "Specify exactly one of --run-id, --partition-key, or a 
partition_date range "
+            "(--start-date/--end-date or --date)."
+        )
+
+    if args.date is not None:
+        if args.start_date is not None or args.end_date is not None:
+            raise SystemExit("--date cannot be combined with --start-date / 
--end-date.")
+        raw = args.date
+        parts = raw.split("~", 1)
+        if len(parts) != 2 or not parts[0].strip() or not parts[1].strip():
+            raise SystemExit("--date must be in the form 'a~b', e.g. 
'2026-01-01~2026-01-31'.")
+        try:
+            args.start_date = parsedate(parts[0].strip())
+            args.end_date = parsedate(parts[1].strip())
+        except ValueError:
+            raise SystemExit("--date sides must be parseable as a date or 
datetime.")
+
+    stmt = select(DagRun).where(DagRun.dag_id == args.dag_id)
+    if args.run_id:
+        stmt = stmt.where(DagRun.run_id == args.run_id)
+    elif args.partition_key is not None:
+        stmt = stmt.where(DagRun.partition_key == args.partition_key)
+    else:
+        stmt = stmt.where(or_(DagRun.partition_key.is_not(None), 
DagRun.partition_date.is_not(None)))
+        if args.start_date is not None:
+            stmt = stmt.where(DagRun.partition_date >= args.start_date)
+        if args.end_date is not None:
+            stmt = stmt.where(DagRun.partition_date <= args.end_date)
+    stmt = stmt.order_by(DagRun.partition_date, DagRun.run_id)
+
+    clear_tis = bool(args.clear_task_instances)
+    cleared = 0
+    processed_any = False
+
+    # For --clear-task-instances: run_ids are buffered so that TIs are fetched 
with a single
+    # SELECT IN per chunk (avoiding N+1). The fetched TIs are then flushed to
+    # clear_task_instances in slices of TI_CHUNK_SIZE, batched by TI count 
rather than
+    # DagRun count. Any leftover TIs that do not fill a full slice are carried 
forward and
+    # combined with the next SELECT's results before the next set of slices is 
cut.
+    ti_buffer_run_ids: list[str] = []
+    ti_carry: list[TaskInstance] = []
+    tis_cleared_total = 0
+    runs_for_ti_total = 0
+    tis_dry_total = 0
+    runs_for_ti_dry = 0
+
+    for run in session.scalars(stmt).yield_per(100):
+        processed_any = True
+        fields_already_cleared = run.partition_key is None and 
run.partition_date is None
+        if fields_already_cleared and not clear_tis:
+            print(f"DagRun {run.run_id}: already cleared, skipping.")
+            continue
+        if not fields_already_cleared:
+            print(
+                f"DagRun {run.run_id}: "
+                f"partition_key={run.partition_key!r} -> None, "
+                f"partition_date={run.partition_date.isoformat() if 
run.partition_date else None} -> None"
+            )
+            if not args.dry_run:
+                run.partition_key = None
+                run.partition_date = None
+            cleared += 1
+        if clear_tis:
+            if args.dry_run:
+                run_tis = 
session.scalars(select(TaskInstance).where(TaskInstance.run_id == 
run.run_id)).all()
+                tis_dry_total += len(run_tis)
+                runs_for_ti_dry += 1
+            else:
+                ti_buffer_run_ids.append(run.run_id)
+                runs_for_ti_total += 1
+                if len(ti_buffer_run_ids) >= TI_CHUNK_SIZE:
+                    tis_cleared_total += _flush_buffer(ti_buffer_run_ids, 
ti_carry, session)
+
+    if not processed_any:
+        print(f"No matching DagRuns found for dag_id={args.dag_id}.")
+        return
+
+    # Flush the tail: fetch any remaining buffered run_ids, combine with 
carry, then
+    # cut full slices and send the final partial slice.
+    if clear_tis:
+        if args.dry_run:
+            print(
+                f"Dry run: would clear task instances on {runs_for_ti_dry} "
+                f"DagRun(s) ({tis_dry_total} task instance(s))."
+            )
+        else:
+            tis_cleared_total += _flush_buffer(ti_buffer_run_ids, ti_carry, 
session, drain=True)
+            print(
+                f"Cleared task instances on {runs_for_ti_total} "
+                f"DagRun(s) ({tis_cleared_total} task instance(s))."
+            )
+
+    if args.dry_run:
+        print(f"Dry run: would clear {cleared} DagRun(s). No changes written.")
+    else:
+        print(f"Cleared partition fields on {cleared} DagRun(s).")
diff --git a/airflow-core/tests/unit/cli/commands/test_partition_command.py 
b/airflow-core/tests/unit/cli/commands/test_partition_command.py
new file mode 100644
index 00000000000..6548ef0aa84
--- /dev/null
+++ b/airflow-core/tests/unit/cli/commands/test_partition_command.py
@@ -0,0 +1,968 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+from unittest import mock
+
+import pendulum
+import pytest
+from sqlalchemy import select
+
+from airflow.cli import cli_parser
+from airflow.cli.commands import partition_command
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import CronPartitionTimetable
+from airflow.utils.session import create_session
+from airflow.utils.state import DagRunState, TaskInstanceState
+
+from tests_common.test_utils.db import clear_db_dags, clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+
+DAG_ID = "test_partitions_clear_dag"
+
+
[email protected]
+def parser():
+    return cli_parser.get_parser()
+
+
[email protected]
+def setup_partitioned_runs(dag_maker):
+    clear_db_runs()
+    clear_db_dags()
+    with dag_maker(
+        DAG_ID,
+        schedule=CronPartitionTimetable("0 0 * * *", timezone=pendulum.UTC),
+        start_date=datetime(2026, 1, 1),
+        catchup=True,
+        serialized=True,
+    ):
+        EmptyOperator(task_id="t1")
+    dag_maker.create_dagrun(
+        run_id="part_run_1",
+        state=DagRunState.SUCCESS,
+        logical_date=None,
+        partition_date=datetime(2026, 1, 1, tzinfo=pendulum.UTC),
+        partition_key="2026-01-01T00:00:00",
+    )
+    dag_maker.create_dagrun(
+        run_id="part_run_2",
+        state=DagRunState.SUCCESS,
+        logical_date=None,
+        partition_date=datetime(2026, 1, 2, tzinfo=pendulum.UTC),
+        partition_key="2026-01-02T00:00:00",
+    )
+    dag_maker.create_dagrun(
+        run_id="part_run_3",
+        state=DagRunState.SUCCESS,
+        logical_date=None,
+        partition_date=datetime(2026, 1, 3, tzinfo=pendulum.UTC),
+        partition_key="2026-01-03T00:00:00",
+    )
+    dag_maker.sync_dagbag_to_db()
+    yield
+    clear_db_runs()
+    clear_db_dags()
+
+
+def _get_run(run_id: str) -> DagRun:
+    with create_session() as session:
+        run = session.scalar(select(DagRun).where(DagRun.run_id == run_id))
+    if run is None:
+        raise AssertionError(f"DagRun {run_id} not found")
+    return run
+
+
+def _set_tis_state(run_id: str, state: TaskInstanceState) -> None:
+    with create_session() as session:
+        tis = session.scalars(select(TaskInstance).where(TaskInstance.run_id 
== run_id)).all()
+        for ti in tis:
+            ti.state = state
+        session.commit()
+
+
+def _get_tis(run_id: str) -> list[TaskInstance]:
+    with create_session() as session:
+        return 
list(session.scalars(select(TaskInstance).where(TaskInstance.run_id == run_id)))
+
+
[email protected]("setup_partitioned_runs")
+class TestPartitionsClear:
+    def test_clear_single_run_id(self, parser, capsys):
+        partition_command.clear(
+            parser.parse_args(["partitions", "clear", "--dag-id", DAG_ID, 
"--run-id", "part_run_2"])
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_2: partition_key='2026-01-02T00:00:00' -> None, "
+            "partition_date=2026-01-02T00:00:00+00:00 -> None\n"
+            "Cleared partition fields on 1 DagRun(s).\n"
+        )
+
+        run_2 = _get_run("part_run_2")
+        assert run_2.partition_key is None
+        assert run_2.partition_date is None
+        # Other runs unchanged
+        run_1 = _get_run("part_run_1")
+        assert run_1.partition_key == "2026-01-01T00:00:00"
+        assert run_1.partition_date == datetime(2026, 1, 1, 
tzinfo=pendulum.UTC)
+
+    def test_clear_with_date_range(self, parser, capsys):
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--start-date",
+                    "2026-01-02",
+                    "--end-date",
+                    "2026-01-03",
+                ]
+            )
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_2: partition_key='2026-01-02T00:00:00' -> None, "
+            "partition_date=2026-01-02T00:00:00+00:00 -> None\n"
+            "DagRun part_run_3: partition_key='2026-01-03T00:00:00' -> None, "
+            "partition_date=2026-01-03T00:00:00+00:00 -> None\n"
+            "Cleared partition fields on 2 DagRun(s).\n"
+        )
+
+        # Out-of-range run untouched
+        run_1 = _get_run("part_run_1")
+        assert run_1.partition_key == "2026-01-01T00:00:00"
+        # In-range runs cleared
+        for run_id in ("part_run_2", "part_run_3"):
+            run = _get_run(run_id)
+            assert run.partition_key is None
+            assert run.partition_date is None
+
+    def test_dry_run_does_not_modify(self, parser, capsys):
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--start-date",
+                    "2026-01-01",
+                    "--dry-run",
+                ]
+            )
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_1: partition_key='2026-01-01T00:00:00' -> None, "
+            "partition_date=2026-01-01T00:00:00+00:00 -> None\n"
+            "DagRun part_run_2: partition_key='2026-01-02T00:00:00' -> None, "
+            "partition_date=2026-01-02T00:00:00+00:00 -> None\n"
+            "DagRun part_run_3: partition_key='2026-01-03T00:00:00' -> None, "
+            "partition_date=2026-01-03T00:00:00+00:00 -> None\n"
+            "Dry run: would clear 3 DagRun(s). No changes written.\n"
+        )
+
+        expected = {
+            "part_run_1": ("2026-01-01T00:00:00", datetime(2026, 1, 1, 
tzinfo=pendulum.UTC)),
+            "part_run_2": ("2026-01-02T00:00:00", datetime(2026, 1, 2, 
tzinfo=pendulum.UTC)),
+            "part_run_3": ("2026-01-03T00:00:00", datetime(2026, 1, 3, 
tzinfo=pendulum.UTC)),
+        }
+        for run_id, (expected_key, expected_date) in expected.items():
+            run = _get_run(run_id)
+            assert run.partition_key == expected_key
+            assert run.partition_date == expected_date
+
+    def test_no_match_prints_message(self, parser, capsys):
+        partition_command.clear(
+            parser.parse_args(["partitions", "clear", "--dag-id", DAG_ID, 
"--run-id", "does_not_exist"])
+        )
+        captured = capsys.readouterr()
+        assert captured.out == f"No matching DagRuns found for 
dag_id={DAG_ID}.\n"
+
+    def test_already_cleared_run_is_skipped(self, parser, capsys):
+        with create_session() as session:
+            run = session.scalar(select(DagRun).where(DagRun.run_id == 
"part_run_1"))
+            run.partition_key = None
+            run.partition_date = None
+            session.commit()
+
+        partition_command.clear(
+            parser.parse_args(["partitions", "clear", "--dag-id", DAG_ID, 
"--run-id", "part_run_1"])
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_1: already cleared, skipping.\nCleared partition 
fields on 0 DagRun(s).\n"
+        )
+
+    def test_requires_run_id_or_range(self, parser):
+        with pytest.raises(SystemExit) as excinfo:
+            partition_command.clear(parser.parse_args(["partitions", "clear", 
"--dag-id", DAG_ID]))
+        assert excinfo.value.code == (
+            "Specify exactly one of --run-id, --partition-key, or a 
partition_date range "
+            "(--start-date/--end-date or --date)."
+        )
+
+    def test_run_id_and_range_mutually_exclusive(self, parser):
+        with pytest.raises(SystemExit) as excinfo:
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        DAG_ID,
+                        "--run-id",
+                        "part_run_1",
+                        "--start-date",
+                        "2026-01-01",
+                    ]
+                )
+            )
+        assert excinfo.value.code == (
+            "Specify exactly one of --run-id, --partition-key, or a 
partition_date range "
+            "(--start-date/--end-date or --date)."
+        )
+
+    def test_prints_before_values(self, parser, capsys):
+        partition_command.clear(
+            parser.parse_args(["partitions", "clear", "--dag-id", DAG_ID, 
"--run-id", "part_run_1"])
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_1: partition_key='2026-01-01T00:00:00' -> None, "
+            "partition_date=2026-01-01T00:00:00+00:00 -> None\n"
+            "Cleared partition fields on 1 DagRun(s).\n"
+        )
+
+    def test_clear_task_instances_resets_run_and_tis(self, parser, capsys):
+        _set_tis_state("part_run_2", TaskInstanceState.SUCCESS)
+        before_clear_number = _get_run("part_run_2").clear_number
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--run-id",
+                    "part_run_2",
+                    "--clear-task-instances",
+                ]
+            )
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_2: partition_key='2026-01-02T00:00:00' -> None, "
+            "partition_date=2026-01-02T00:00:00+00:00 -> None\n"
+            "Cleared task instances on 1 DagRun(s) (1 task instance(s)).\n"
+            "Cleared partition fields on 1 DagRun(s).\n"
+        )
+
+        run_2 = _get_run("part_run_2")
+        assert run_2.partition_key is None
+        assert run_2.partition_date is None
+        assert run_2.state == DagRunState.QUEUED
+        assert run_2.clear_number == before_clear_number + 1
+        assert all(ti.state is None for ti in _get_tis("part_run_2"))
+
+        # Untargeted run is unaffected.
+        run_1 = _get_run("part_run_1")
+        assert run_1.partition_key == "2026-01-01T00:00:00"
+        assert run_1.state == DagRunState.SUCCESS
+
+    def test_clear_task_instances_dry_run_makes_no_changes(self, parser, 
capsys):
+        _set_tis_state("part_run_2", TaskInstanceState.SUCCESS)
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--run-id",
+                    "part_run_2",
+                    "--clear-task-instances",
+                    "--dry-run",
+                ]
+            )
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_2: partition_key='2026-01-02T00:00:00' -> None, "
+            "partition_date=2026-01-02T00:00:00+00:00 -> None\n"
+            "Dry run: would clear task instances on 1 DagRun(s) (1 task 
instance(s)).\n"
+            "Dry run: would clear 1 DagRun(s). No changes written.\n"
+        )
+
+        run_2 = _get_run("part_run_2")
+        assert run_2.partition_key == "2026-01-02T00:00:00"
+        assert run_2.partition_date == datetime(2026, 1, 2, 
tzinfo=pendulum.UTC)
+        assert run_2.state == DagRunState.SUCCESS
+        assert all(ti.state == TaskInstanceState.SUCCESS for ti in 
_get_tis("part_run_2"))
+
+    def test_clear_task_instances_on_already_cleared_run(self, parser, capsys):
+        # Partition fields already None, but the user still wants to re-run 
the DagRun.
+        with create_session() as session:
+            run = session.scalar(select(DagRun).where(DagRun.run_id == 
"part_run_1"))
+            run.partition_key = None
+            run.partition_date = None
+            session.commit()
+        _set_tis_state("part_run_1", TaskInstanceState.SUCCESS)
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--run-id",
+                    "part_run_1",
+                    "--clear-task-instances",
+                ]
+            )
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "Cleared task instances on 1 DagRun(s) (1 task instance(s)).\n"
+            "Cleared partition fields on 0 DagRun(s).\n"
+        )
+
+        run_1 = _get_run("part_run_1")
+        assert run_1.state == DagRunState.QUEUED
+        assert all(ti.state is None for ti in _get_tis("part_run_1"))
+
+    def test_clear_streams_runs_without_materialising(self, parser, dag_maker):
+        """Clearing 250 partitioned runs clears every one of them (streaming 
smoke test)."""
+        clear_db_runs()
+        clear_db_dags()
+        n = 250
+        with dag_maker(
+            "dag_stream_250",
+            schedule=CronPartitionTimetable("0 0 * * *", 
timezone=pendulum.UTC),
+            start_date=datetime(2024, 1, 1),
+            catchup=True,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t1")
+        for i in range(n):
+            dag_maker.create_dagrun(
+                run_id=f"stream_run_{i:04d}",
+                state=DagRunState.SUCCESS,
+                logical_date=None,
+                partition_date=datetime(2024, 1, 1, tzinfo=pendulum.UTC) + 
pendulum.duration(days=i),
+                partition_key=f"2024-key-{i:04d}",
+            )
+        dag_maker.sync_dagbag_to_db()
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    "dag_stream_250",
+                    "--start-date",
+                    "2024-01-01",
+                    "--end-date",
+                    "2025-12-31",
+                ]
+            )
+        )
+
+        with create_session() as session:
+            runs = list(
+                session.scalars(
+                    select(DagRun).where(DagRun.dag_id == 
"dag_stream_250").order_by(DagRun.run_id)
+                )
+            )
+
+        cleared_run_ids = [r.run_id for r in runs if r.partition_key is None 
and r.partition_date is None]
+        expected_run_ids = [f"stream_run_{i:04d}" for i in range(n)]
+        assert cleared_run_ids == expected_run_ids
+
+        clear_db_runs()
+        clear_db_dags()
+
+    def test_clear_task_instances_chunks_at_cap(self, parser, dag_maker):
+        """2 DRs × 3 TIs/DR = 6 TIs == TI_CHUNK_SIZE → clear_task_instances 
called once with all 6."""
+        ti_cap = 6
+        clear_db_runs()
+        clear_db_dags()
+        with dag_maker(
+            "dag_chunk_at_cap",
+            schedule=CronPartitionTimetable("0 0 * * *", 
timezone=pendulum.UTC),
+            start_date=datetime(2024, 1, 1),
+            catchup=True,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t1")
+            EmptyOperator(task_id="t2")
+            EmptyOperator(task_id="t3")
+        for i in range(2):
+            dag_maker.create_dagrun(
+                run_id=f"at_cap_run_{i:04d}",
+                state=DagRunState.SUCCESS,
+                logical_date=None,
+                partition_date=datetime(2024, 1, 1, tzinfo=pendulum.UTC) + 
pendulum.duration(days=i),
+                partition_key=f"at-cap-key-{i:04d}",
+            )
+        dag_maker.sync_dagbag_to_db()
+
+        with (
+            mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap),
+            mock.patch(
+                "airflow.cli.commands.partition_command.clear_task_instances", 
autospec=True
+            ) as mock_cti,
+        ):
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        "dag_chunk_at_cap",
+                        "--start-date",
+                        "2024-01-01",
+                        "--end-date",
+                        "2025-12-31",
+                        "--clear-task-instances",
+                    ]
+                )
+            )
+
+        # Exactly one call: 6 TIs == cap, all flushed together in the tail.
+        assert [len(c.args[0]) for c in mock_cti.mock_calls] == [6]
+
+        clear_db_runs()
+        clear_db_dags()
+
+    def test_clear_task_instances_chunks_over_cap(self, parser, dag_maker):
+        """3 DRs × 3 TIs/DR = 9 TIs > TI_CHUNK_SIZE=6 → two calls: [6, 3]."""
+        ti_cap = 6
+        clear_db_runs()
+        clear_db_dags()
+        with dag_maker(
+            "dag_chunk_over_cap",
+            schedule=CronPartitionTimetable("0 0 * * *", 
timezone=pendulum.UTC),
+            start_date=datetime(2024, 1, 1),
+            catchup=True,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t1")
+            EmptyOperator(task_id="t2")
+            EmptyOperator(task_id="t3")
+        for i in range(3):
+            dag_maker.create_dagrun(
+                run_id=f"over_cap_run_{i:04d}",
+                state=DagRunState.SUCCESS,
+                logical_date=None,
+                partition_date=datetime(2024, 1, 1, tzinfo=pendulum.UTC) + 
pendulum.duration(days=i),
+                partition_key=f"over-cap-key-{i:04d}",
+            )
+        dag_maker.sync_dagbag_to_db()
+
+        with (
+            mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap),
+            mock.patch(
+                "airflow.cli.commands.partition_command.clear_task_instances", 
autospec=True
+            ) as mock_cti,
+        ):
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        "dag_chunk_over_cap",
+                        "--start-date",
+                        "2024-01-01",
+                        "--end-date",
+                        "2025-12-31",
+                        "--clear-task-instances",
+                    ]
+                )
+            )
+
+        # First slice: 6 TIs at cap; tail: 3 remaining TIs.
+        assert [len(c.args[0]) for c in mock_cti.mock_calls] == [6, 3]
+
+        clear_db_runs()
+        clear_db_dags()
+
+    def test_clear_task_instances_chunks_just_under_cap(self, parser, 
dag_maker):
+        """1 DR × 5 TIs = 5 TIs < TI_CHUNK_SIZE=6 → one tail call with all 
5."""
+        ti_cap = 6
+        clear_db_runs()
+        clear_db_dags()
+        with dag_maker(
+            "dag_chunk_under_cap",
+            schedule=CronPartitionTimetable("0 0 * * *", 
timezone=pendulum.UTC),
+            start_date=datetime(2024, 1, 1),
+            catchup=True,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t1")
+            EmptyOperator(task_id="t2")
+            EmptyOperator(task_id="t3")
+            EmptyOperator(task_id="t4")
+            EmptyOperator(task_id="t5")
+        dag_maker.create_dagrun(
+            run_id="under_cap_run_0000",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2024, 1, 1, tzinfo=pendulum.UTC),
+            partition_key="under-cap-key-0000",
+        )
+        dag_maker.sync_dagbag_to_db()
+
+        with (
+            mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap),
+            mock.patch(
+                "airflow.cli.commands.partition_command.clear_task_instances", 
autospec=True
+            ) as mock_cti,
+        ):
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        "dag_chunk_under_cap",
+                        "--start-date",
+                        "2024-01-01",
+                        "--end-date",
+                        "2025-12-31",
+                        "--clear-task-instances",
+                    ]
+                )
+            )
+
+        # 5 TIs < cap of 6: no mid-loop flush triggered, tail sends all 5 in 
one call.
+        assert [len(c.args[0]) for c in mock_cti.mock_calls] == [5]
+
+        clear_db_runs()
+        clear_db_dags()
+
+    def test_clear_task_instances_chunks_mid_loop_trigger(self, parser, 
dag_maker):
+        """Pin mid-loop SELECT IN + slice trigger (partition_command.py 
L120-132).
+
+        Design: TI_CHUNK_SIZE=3, 1 dag with 2 tasks, 3 DRs (6 TIs total).
+
+        Execution trace:
+        - DR0: ti_buffer=[r0], len=1 < 3 -- no mid-loop
+        - DR1: ti_buffer=[r0, r1], len=2 < 3 -- no mid-loop
+        - DR2: ti_buffer=[r0, r1, r2], len=3 >= 3 -- mid-loop FIRES:
+            SELECT IN(r0, r1, r2) -> 6 TIs
+            ti_buffer_run_ids.clear() -> []
+            ti_carry.extend(6 TIs) -> carry len=6
+            while len>=3: slice[3] -> call #1, carry len=3
+            while len>=3: slice[3] -> call #2, carry len=0
+            while exits
+        - tail: ti_buffer empty, carry empty -- no calls
+        Expected mock_calls sizes: [3, 3]
+
+        Note: carry-across-SELECT (mid-loop leftover surviving to the next 
outer
+        fetch) cannot arise in a single-dag CLI invocation.  When mid-loop 
triggers,
+        SELECT returns tasks_per_DR * TI_CHUNK_SIZE TIs -- always a 
TI_CHUNK_SIZE
+        multiple -- so there is never a leftover after the inner while loop.
+        Carry leftover only arises in the tail flush, which is pinned by
+        test_clear_task_instances_chunks_over_cap.
+        """
+        ti_cap = 3
+        clear_db_runs()
+        clear_db_dags()
+        with dag_maker(
+            "dag_chunk_mid_loop",
+            schedule=CronPartitionTimetable("0 0 * * *", 
timezone=pendulum.UTC),
+            start_date=datetime(2024, 1, 1),
+            catchup=True,
+            serialized=True,
+        ):
+            EmptyOperator(task_id="t1")
+            EmptyOperator(task_id="t2")
+        for i in range(3):
+            dag_maker.create_dagrun(
+                run_id=f"mid_loop_run_{i:04d}",
+                state=DagRunState.SUCCESS,
+                logical_date=None,
+                partition_date=datetime(2024, 1, 1, tzinfo=pendulum.UTC) + 
pendulum.duration(days=i),
+                partition_key=f"mid-loop-key-{i:04d}",
+            )
+        dag_maker.sync_dagbag_to_db()
+
+        with (
+            mock.patch.object(partition_command, "TI_CHUNK_SIZE", ti_cap),
+            mock.patch(
+                "airflow.cli.commands.partition_command.clear_task_instances", 
autospec=True
+            ) as mock_cti,
+        ):
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        "dag_chunk_mid_loop",
+                        "--start-date",
+                        "2024-01-01",
+                        "--end-date",
+                        "2025-12-31",
+                        "--clear-task-instances",
+                    ]
+                )
+            )
+
+        # Mid-loop fires on DR2: two slices of 3 each; tail is empty.
+        assert [len(c.args[0]) for c in mock_cti.mock_calls] == [3, 3]
+
+        clear_db_runs()
+        clear_db_dags()
+
+    # ------------------------------------------------------------------
+    # --partition-key tests
+    # ------------------------------------------------------------------
+
+    def test_clear_by_partition_key(self, parser, capsys):
+        partition_command.clear(
+            parser.parse_args(
+                ["partitions", "clear", "--dag-id", DAG_ID, "--partition-key", 
"2026-01-02T00:00:00"]
+            )
+        )
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_2: partition_key='2026-01-02T00:00:00' -> None, "
+            "partition_date=2026-01-02T00:00:00+00:00 -> None\n"
+            "Cleared partition fields on 1 DagRun(s).\n"
+        )
+
+        run_2 = _get_run("part_run_2")
+        assert run_2.partition_key is None
+        assert run_2.partition_date is None
+        # Other runs unchanged
+        run_1 = _get_run("part_run_1")
+        assert run_1.partition_key == "2026-01-01T00:00:00"
+        assert run_1.partition_date == datetime(2026, 1, 1, 
tzinfo=pendulum.UTC)
+        run_3 = _get_run("part_run_3")
+        assert run_3.partition_key == "2026-01-03T00:00:00"
+        assert run_3.partition_date == datetime(2026, 1, 3, 
tzinfo=pendulum.UTC)
+
+    def test_clear_by_partition_key_no_match(self, parser, capsys):
+        partition_command.clear(
+            parser.parse_args(
+                ["partitions", "clear", "--dag-id", DAG_ID, "--partition-key", 
"nonexistent-key"]
+            )
+        )
+        captured = capsys.readouterr()
+        assert captured.out == f"No matching DagRuns found for 
dag_id={DAG_ID}.\n"
+
+        # All runs unchanged
+        for run_id, expected_key in [
+            ("part_run_1", "2026-01-01T00:00:00"),
+            ("part_run_2", "2026-01-02T00:00:00"),
+            ("part_run_3", "2026-01-03T00:00:00"),
+        ]:
+            run = _get_run(run_id)
+            assert run.partition_key == expected_key
+
+    def test_partition_key_mutually_exclusive_with_run_id(self, parser):
+        with pytest.raises(SystemExit) as excinfo:
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        DAG_ID,
+                        "--run-id",
+                        "part_run_1",
+                        "--partition-key",
+                        "foo",
+                    ]
+                )
+            )
+        assert excinfo.value.code == (
+            "Specify exactly one of --run-id, --partition-key, or a 
partition_date range "
+            "(--start-date/--end-date or --date)."
+        )
+
+    def test_partition_key_mutually_exclusive_with_date_range(self, parser):
+        with pytest.raises(SystemExit) as excinfo:
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        DAG_ID,
+                        "--partition-key",
+                        "foo",
+                        "--start-date",
+                        "2026-01-01",
+                    ]
+                )
+            )
+        assert excinfo.value.code == (
+            "Specify exactly one of --run-id, --partition-key, or a 
partition_date range "
+            "(--start-date/--end-date or --date)."
+        )
+
+    # ------------------------------------------------------------------
+    # --date a~b tests
+    # ------------------------------------------------------------------
+
+    @pytest.mark.parametrize(
+        "cli_args",
+        [
+            ["--start-date", "2026-01-02", "--end-date", "2026-01-03"],
+            ["--date", "2026-01-02~2026-01-03"],
+        ],
+    )
+    def test_date_range_syntax_equivalent_to_start_end(self, parser, capsys, 
cli_args):
+        partition_command.clear(parser.parse_args(["partitions", "clear", 
"--dag-id", DAG_ID, *cli_args]))
+        captured = capsys.readouterr()
+        assert captured.out == (
+            "DagRun part_run_2: partition_key='2026-01-02T00:00:00' -> None, "
+            "partition_date=2026-01-02T00:00:00+00:00 -> None\n"
+            "DagRun part_run_3: partition_key='2026-01-03T00:00:00' -> None, "
+            "partition_date=2026-01-03T00:00:00+00:00 -> None\n"
+            "Cleared partition fields on 2 DagRun(s).\n"
+        )
+
+    @pytest.mark.parametrize(
+        "bad_date",
+        [
+            "2026-01-01",  # no ~
+            "~2026-01-01",  # left side empty
+            "2026-01-01~",  # right side empty
+        ],
+    )
+    def test_date_range_syntax_invalid_format(self, parser, bad_date):
+        with pytest.raises(SystemExit) as excinfo:
+            partition_command.clear(
+                parser.parse_args(["partitions", "clear", "--dag-id", DAG_ID, 
"--date", bad_date])
+            )
+        assert "--date must be in the form 'a~b'" in str(excinfo.value.code)
+
+    def test_date_range_syntax_mutually_exclusive_with_start_end(self, parser):
+        with pytest.raises(SystemExit) as excinfo:
+            partition_command.clear(
+                parser.parse_args(
+                    [
+                        "partitions",
+                        "clear",
+                        "--dag-id",
+                        DAG_ID,
+                        "--date",
+                        "2026-01-01~2026-01-02",
+                        "--start-date",
+                        "2026-01-01",
+                    ]
+                )
+            )
+        assert excinfo.value.code == "--date cannot be combined with 
--start-date / --end-date."
+
+    def test_date_range_end_date_is_literal(self, parser, dag_maker):
+        """Pin literal <= semantics: --date right side is used as-is (no 
end-of-day clamp).
+
+        '2026-01-02' parses to midnight 2026-01-02T00:00:00, so a run at 15:00 
on
+        the same day has partition_date > the bound and must NOT be cleared.
+        """
+        dag_maker.create_dagrun(
+            run_id="part_run_2_midday_date",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 15, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T15:00:00",
+        )
+        dag_maker.sync_dagbag_to_db()
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--date",
+                    "2026-01-02~2026-01-02",
+                ]
+            )
+        )
+
+        # Midnight run on exact date is matched (partition_date == bound).
+        run_2 = _get_run("part_run_2")
+        assert run_2.partition_key is None
+        assert run_2.partition_date is None
+        # 15:00 > midnight bound — NOT cleared.
+        run_midday = _get_run("part_run_2_midday_date")
+        assert run_midday.partition_key == "2026-01-02T15:00:00"
+        assert run_midday.partition_date == datetime(2026, 1, 2, 15, 0, 0, 
tzinfo=pendulum.UTC)
+        # Runs outside the range are untouched.
+        run_1 = _get_run("part_run_1")
+        assert run_1.partition_key == "2026-01-01T00:00:00"
+        run_3 = _get_run("part_run_3")
+        assert run_3.partition_key == "2026-01-03T00:00:00"
+
+    def test_clear_with_datetime_end_date_no_clamp(self, parser, dag_maker):
+        """Pin literal <= semantics with a datetime endpoint: runs after the 
bound are excluded."""
+        dag_maker.create_dagrun(
+            run_id="part_run_h10",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 10, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T10:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="part_run_h15",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 15, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T15:00:00",
+        )
+        dag_maker.sync_dagbag_to_db()
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--end-date",
+                    "2026-01-02T10:00:00",
+                ]
+            )
+        )
+
+        # 10:00 is within the boundary (<=), must be cleared.
+        run_h10 = _get_run("part_run_h10")
+        assert run_h10.partition_key is None
+        assert run_h10.partition_date is None
+        # 15:00 exceeds the un-clamped 10:00 boundary, must be untouched.
+        run_h15 = _get_run("part_run_h15")
+        assert run_h15.partition_key == "2026-01-02T15:00:00"
+
+    def test_clear_hourly_window_via_start_end_date(self, parser, dag_maker):
+        """ISO datetime --start-date / --end-date selects only runs within the 
exact window."""
+        dag_maker.create_dagrun(
+            run_id="part_run_h02",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 2, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T02:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="part_run_h05",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 5, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T05:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="part_run_h11b",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 11, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T11:00:00",
+        )
+        dag_maker.sync_dagbag_to_db()
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--start-date",
+                    "2026-01-02T03:00:00",
+                    "--end-date",
+                    "2026-01-02T10:00:00",
+                ]
+            )
+        )
+
+        # 02:00 is before the window start — untouched.
+        run_h02 = _get_run("part_run_h02")
+        assert run_h02.partition_key == "2026-01-02T02:00:00"
+        # 05:00 is inside [03:00, 10:00] — cleared.
+        run_h05 = _get_run("part_run_h05")
+        assert run_h05.partition_key is None
+        assert run_h05.partition_date is None
+        # 11:00 is after the window end — untouched.
+        run_h11b = _get_run("part_run_h11b")
+        assert run_h11b.partition_key == "2026-01-02T11:00:00"
+
+    def test_clear_via_date_range_with_datetime_endpoints(self, parser, 
dag_maker):
+        """--date with ISO datetime endpoints does not clamp the right side."""
+        dag_maker.create_dagrun(
+            run_id="part_run_h02b",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 2, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T02:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="part_run_h05b",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 5, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T05:00:00",
+        )
+        dag_maker.create_dagrun(
+            run_id="part_run_h11c",
+            state=DagRunState.SUCCESS,
+            logical_date=None,
+            partition_date=datetime(2026, 1, 2, 11, 0, 0, tzinfo=pendulum.UTC),
+            partition_key="2026-01-02T11:00:00",
+        )
+        dag_maker.sync_dagbag_to_db()
+
+        partition_command.clear(
+            parser.parse_args(
+                [
+                    "partitions",
+                    "clear",
+                    "--dag-id",
+                    DAG_ID,
+                    "--date",
+                    "2026-01-02T03:00:00~2026-01-02T10:00:00",
+                ]
+            )
+        )
+
+        # 02:00 is before window start — untouched.
+        run_h02b = _get_run("part_run_h02b")
+        assert run_h02b.partition_key == "2026-01-02T02:00:00"
+        # 05:00 is inside [03:00, 10:00] — cleared.
+        run_h05b = _get_run("part_run_h05b")
+        assert run_h05b.partition_key is None
+        assert run_h05b.partition_date is None
+        # 11:00 is after un-clamped 10:00 end — untouched.
+        run_h11c = _get_run("part_run_h11c")
+        assert run_h11c.partition_key == "2026-01-02T11:00:00"

Reply via email to