This is an automated email from the ASF dual-hosted git repository.
Lee-W pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c6d6e007913 feat(cli): add `airflow dags clear` for partition-range
reprocessing (#66004)
c6d6e007913 is described below
commit c6d6e007913ad8708344fc7275bfcc4ff4632ed0
Author: Wei Lee <[email protected]>
AuthorDate: Mon May 25 23:26:42 2026 +0800
feat(cli): add `airflow dags clear` for partition-range reprocessing
(#66004)
---
airflow-core/src/airflow/cli/cli_config.py | 49 ++++
.../src/airflow/cli/commands/dag_command.py | 78 +++++-
.../tests/unit/cli/commands/test_dag_command.py | 308 ++++++++++++++++++++-
3 files changed, 432 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index df0c2c15fbe..b41ac481e0e 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -185,6 +185,30 @@ ARG_END_DATE = Arg(
),
type=parsedate,
)
+ARG_PARTITION_DATE_START = Arg(
+ ("--partition-date-start",),
+ help=(
+ "Inclusive lower bound of the partition_date window (matched against
DagRun.partition_date). "
+ "Accepts the same datetime formats as --start-date."
+ ),
+ type=parsedate,
+)
+ARG_PARTITION_DATE_END = Arg(
+ ("--partition-date-end",),
+ help=(
+ "Inclusive upper bound of the partition_date window (matched against
DagRun.partition_date). "
+ "Accepts the same datetime formats as --end-date."
+ ),
+ type=parsedate,
+)
+ARG_PARTITION_KEY = Arg(
+ ("--partition-key",),
+ help="Clear all Dag runs whose partition_key matches this exact value.",
+)
+ARG_CLEAR_RUN_ID = Arg(
+ ("--run-id",),
+ help="Clear the Dag run with this run_id.",
+)
ARG_OUTPUT_PATH = Arg(
(
"-o",
@@ -1101,6 +1125,31 @@ DAGS_COMMANDS = (
func=lazy_load_command("airflow.cli.commands.dag_command.dag_details"),
args=(ARG_DAG_ID, ARG_OUTPUT, ARG_VERBOSE),
),
+ ActionCommand(
+ name="clear",
+ help="Clear Dag runs selected by run_id, partition_key, or a
partition_date window",
+ description=(
+ "Clear Dag runs of the given dag_id and re-queue them for
reprocessing. Exactly one "
+ "of the following selectors must be provided: --run-id (single
run); --partition-key "
+ "(every run with that exact partition_key); or a partition_date
window via "
+ "--partition-date-start and/or --partition-date-end (inclusive on
both ends). "
+ "Intended for partitioned Dags, whose runs are keyed by
partition_date / "
+ "partition_key instead of logical_date. For traditional,
non-partitioned Dags, use "
+ "`airflow tasks clear --start-date / --end-date`."
+ ),
+ func=lazy_load_command("airflow.cli.commands.dag_command.dag_clear"),
+ args=(
+ ARG_DAG_ID,
+ ARG_CLEAR_RUN_ID,
+ ARG_PARTITION_KEY,
+ ARG_PARTITION_DATE_START,
+ ARG_PARTITION_DATE_END,
+ ARG_ONLY_FAILED,
+ ARG_ONLY_RUNNING,
+ ARG_YES,
+ ARG_VERBOSE,
+ ),
+ ),
ActionCommand(
name="list",
help="List all the DAGs",
diff --git a/airflow-core/src/airflow/cli/commands/dag_command.py
b/airflow-core/src/airflow/cli/commands/dag_command.py
index 13a4ad90596..3a28e4f1b20 100644
--- a/airflow-core/src/airflow/cli/commands/dag_command.py
+++ b/airflow-core/src/airflow/cli/commands/dag_command.py
@@ -47,7 +47,12 @@ from airflow.models.errors import ParseImportError
from airflow.models.serialized_dag import SerializedDagModel
from airflow.timetables.base import TimeRestriction
from airflow.utils import cli as cli_utils
-from airflow.utils.cli import get_bagged_dag, suppress_logs_and_warning,
validate_dag_bundle_arg
+from airflow.utils.cli import (
+ get_bagged_dag,
+ get_db_dag,
+ suppress_logs_and_warning,
+ validate_dag_bundle_arg,
+)
from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
from airflow.utils.helpers import ask_yesno
from airflow.utils.platform import getuser
@@ -117,6 +122,77 @@ def dag_delete(args) -> None:
print("Cancelled")
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def dag_clear(args, session: Session = NEW_SESSION) -> None:
+ """Clear Dag runs selected by run_id, partition_key, or a partition_date
window."""
+ has_range = args.partition_date_start is not None or
args.partition_date_end is not None
+ selectors_used = sum([args.run_id is not None, args.partition_key is not
None, has_range])
+ if selectors_used == 0:
+ raise SystemExit(
+ "One of --run-id, --partition-key, or --partition-date-start /
--partition-date-end "
+ "must be provided."
+ )
+ if selectors_used > 1:
+ raise SystemExit(
+ "--run-id, --partition-key, and --partition-date-start /
--partition-date-end are "
+ "mutually exclusive; provide exactly one selector."
+ )
+ if (
+ args.partition_date_start is not None
+ and args.partition_date_end is not None
+ and args.partition_date_start > args.partition_date_end
+ ):
+ raise SystemExit("--partition-date-start must be on or before
--partition-date-end.")
+
+ dag = get_db_dag(bundle_names=None, dag_id=args.dag_id)
+
+ query = select(DagRun.run_id, DagRun.partition_key,
DagRun.partition_date).where(
+ DagRun.dag_id == args.dag_id
+ )
+ if args.run_id is not None:
+ query = query.where(DagRun.run_id == args.run_id)
+ elif args.partition_key is not None:
+ query = query.where(DagRun.partition_key == args.partition_key)
+ else:
+ query = query.where(DagRun.partition_date.is_not(None))
+ if args.partition_date_start is not None:
+ query = query.where(DagRun.partition_date >=
args.partition_date_start)
+ if args.partition_date_end is not None:
+ query = query.where(DagRun.partition_date <=
args.partition_date_end)
+ query = query.order_by(DagRun.partition_date, DagRun.run_id)
+
+ runs = list(session.execute(query).all())
+ if not runs:
+ print("No matching Dag runs found.")
+ return
+
+ run_ids = [run.run_id for run in runs]
+ if not args.yes:
+ listing = "\n".join(
+ f" {run.run_id} partition_key={run.partition_key}
partition_date={run.partition_date}"
+ for run in runs
+ )
+ question = (
+ f"You are about to clear {len(runs)} Dag run(s) of
{args.dag_id!r}:\n"
+ f"{listing}\n\nAre you sure? [y/n]"
+ )
+ if not ask_yesno(question):
+ print("Cancelled, nothing was cleared.")
+ return
+
+ cleared = 0
+ for run_id in run_ids:
+ cleared += dag.clear(
+ run_id=run_id,
+ only_failed=args.only_failed,
+ only_running=args.only_running,
+ session=session,
+ )
+ print(f"Cleared {cleared} task instance(s) across {len(run_ids)} Dag
run(s).")
+
+
@cli_utils.action_cli
@providers_configuration_loaded
def dag_pause(args) -> None:
diff --git a/airflow-core/tests/unit/cli/commands/test_dag_command.py
b/airflow-core/tests/unit/cli/commands/test_dag_command.py
index 183e80ee216..37178a85ef5 100644
--- a/airflow-core/tests/unit/cli/commands/test_dag_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_dag_command.py
@@ -41,14 +41,16 @@ from airflow.exceptions import AirflowException
from airflow.models import DagModel, DagRun
from airflow.models.dagbag import DBDagBag
from airflow.models.serialized_dag import SerializedDagModel
+from airflow.models.taskinstance import TaskInstance
+from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.triggers.temporal import DateTimeTrigger,
TimeDeltaTrigger
-from airflow.sdk import DAG, BaseOperator, task
+from airflow.sdk import DAG, BaseOperator, CronPartitionTimetable, task
from airflow.sdk.definitions.dag import _run_inline_trigger
from airflow.sdk.execution_time.comms import _RequestFrame, _ResponseFrame
from airflow.serialization.serialized_objects import DagSerialization,
LazyDeserializedDAG
from airflow.triggers.base import TriggerEvent
from airflow.utils.session import create_session
-from airflow.utils.state import DagRunState
+from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from tests_common.test_utils.config import conf_vars
@@ -1158,6 +1160,308 @@ class TestCliDagsReserialize:
assert dag_processor_parsing_result.serialized_dags[0].hash ==
serialized_dag_hash[0]
+class TestCliDagsClear:
+ """Tests for the `airflow dags clear` partition-range subcommand."""
+
+ DAG_ID = "test_dags_clear_partitioned"
+
+ @pytest.fixture
+ def parser(self) -> argparse.ArgumentParser:
+ return cli_parser.get_parser()
+
+ @pytest.fixture(autouse=True)
+ def _clear_db(self):
+ yield
+ clear_db_runs()
+ clear_db_dags()
+
+ @pytest.fixture
+ def seeded_partitioned_runs(self, dag_maker):
+ with dag_maker(
+ self.DAG_ID,
+ schedule=CronPartitionTimetable("0 0 * * *",
timezone=pendulum.UTC),
+ start_date=datetime(2026, 3, 1, tzinfo=pendulum.UTC),
+ catchup=True,
+ serialized=True,
+ ):
+ EmptyOperator(task_id="t1")
+ # Three partitioned runs, plus one unpartitioned run that must never
be touched.
+ dag_maker.create_dagrun(
+ run_id="part_2026_03_08",
+ state=DagRunState.SUCCESS,
+ logical_date=None,
+ partition_date=datetime(2026, 3, 8, tzinfo=pendulum.UTC),
+ partition_key="2026-03-08T00:00:00",
+ )
+ dag_maker.create_dagrun(
+ run_id="part_2026_03_10",
+ state=DagRunState.FAILED,
+ logical_date=None,
+ partition_date=datetime(2026, 3, 10, tzinfo=pendulum.UTC),
+ partition_key="2026-03-10T00:00:00",
+ )
+ dag_maker.create_dagrun(
+ run_id="part_2026_03_14",
+ state=DagRunState.SUCCESS,
+ logical_date=None,
+ partition_date=datetime(2026, 3, 14, tzinfo=pendulum.UTC),
+ partition_key="2026-03-14T00:00:00",
+ )
+ dag_maker.create_dagrun(
+ run_id="non_partitioned",
+ state=DagRunState.SUCCESS,
+ logical_date=datetime(2026, 3, 9, tzinfo=pendulum.UTC),
+ partition_date=None,
+ )
+ dag_maker.sync_dagbag_to_db()
+
+ def _get_run_states(self):
+ with create_session() as session:
+ return {
+ row.run_id: row.state
+ for row in session.scalars(select(DagRun).where(DagRun.dag_id
== self.DAG_ID)).all()
+ }
+
+ def test_requires_a_selector(self, parser):
+ args = parser.parse_args(["dags", "clear", self.DAG_ID, "--yes"])
+ with pytest.raises(SystemExit, match="One of --run-id,
--partition-key"):
+ dag_command.dag_clear(args)
+
+ @pytest.mark.parametrize(
+ "extra_args",
+ [
+ pytest.param(
+ ["--run-id", "part_2026_03_10", "--partition-key",
"2026-03-10T00:00:00"],
+ id="run-id+partition-key",
+ ),
+ pytest.param(
+ ["--run-id", "part_2026_03_10", "--partition-date-start",
"2026-03-08T00:00:00"],
+ id="run-id+date-range",
+ ),
+ pytest.param(
+ ["--partition-key", "2026-03-10T00:00:00",
"--partition-date-end", "2026-03-14T00:00:00"],
+ id="partition-key+date-range",
+ ),
+ ],
+ )
+ def test_rejects_multiple_selectors(self, parser, extra_args):
+ args = parser.parse_args(["dags", "clear", self.DAG_ID, "--yes",
*extra_args])
+ with pytest.raises(SystemExit, match="mutually exclusive"):
+ dag_command.dag_clear(args)
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_rejects_inverted_window(self, parser):
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-date-start",
+ "2026-03-14T00:00:00",
+ "--partition-date-end",
+ "2026-03-08T00:00:00",
+ "--yes",
+ ]
+ )
+ with pytest.raises(SystemExit, match="--partition-date-start must be
on or before"):
+ dag_command.dag_clear(args)
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_clears_runs_in_window_inclusive(self, parser):
+ # Literal flag values from issue #65921: short ISO `YYYY-MM-DDTHH`
form.
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-date-start",
+ "2026-03-08T00",
+ "--partition-date-end",
+ "2026-03-14T23",
+ "--yes",
+ ]
+ )
+ dag_command.dag_clear(args)
+
+ states = self._get_run_states()
+ # Inclusive both ends: 03-08 and 03-14 boundary runs are cleared
(state -> QUEUED).
+ assert states["part_2026_03_08"] == DagRunState.QUEUED
+ assert states["part_2026_03_10"] == DagRunState.QUEUED
+ assert states["part_2026_03_14"] == DagRunState.QUEUED
+ # Run with NULL partition_date is never matched.
+ assert states["non_partitioned"] == DagRunState.SUCCESS
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_open_lower_bound(self, parser):
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-date-end",
+ "2026-03-09T00:00:00",
+ "--yes",
+ ]
+ )
+ dag_command.dag_clear(args)
+
+ states = self._get_run_states()
+ assert states["part_2026_03_08"] == DagRunState.QUEUED
+ assert states["part_2026_03_10"] == DagRunState.FAILED
+ assert states["part_2026_03_14"] == DagRunState.SUCCESS
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_open_upper_bound(self, parser):
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-date-start",
+ "2026-03-13T00:00:00",
+ "--yes",
+ ]
+ )
+ dag_command.dag_clear(args)
+
+ states = self._get_run_states()
+ assert states["part_2026_03_08"] == DagRunState.SUCCESS
+ assert states["part_2026_03_10"] == DagRunState.FAILED
+ assert states["part_2026_03_14"] == DagRunState.QUEUED
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_no_matching_runs_is_a_no_op(self, parser, capsys):
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-date-start",
+ "2027-01-01T00:00:00",
+ "--partition-date-end",
+ "2027-12-31T00:00:00",
+ "--yes",
+ ]
+ )
+ dag_command.dag_clear(args)
+ out = capsys.readouterr().out
+ assert "No matching Dag runs" in out
+ assert self._get_run_states()["part_2026_03_10"] == DagRunState.FAILED
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ @mock.patch("airflow.cli.commands.dag_command.ask_yesno",
return_value=False)
+ def test_prompt_decline_does_not_clear(self, mock_ask, parser):
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-date-start",
+ "2026-03-08T00:00:00",
+ "--partition-date-end",
+ "2026-03-14T00:00:00",
+ ]
+ )
+ dag_command.dag_clear(args)
+ mock_ask.assert_called_once()
+ states = self._get_run_states()
+ assert states["part_2026_03_08"] == DagRunState.SUCCESS
+ assert states["part_2026_03_10"] == DagRunState.FAILED
+ assert states["part_2026_03_14"] == DagRunState.SUCCESS
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_clears_by_run_id(self, parser):
+ args = parser.parse_args(["dags", "clear", self.DAG_ID, "--run-id",
"part_2026_03_10", "--yes"])
+ dag_command.dag_clear(args)
+
+ states = self._get_run_states()
+ assert states["part_2026_03_08"] == DagRunState.SUCCESS
+ assert states["part_2026_03_10"] == DagRunState.QUEUED
+ assert states["part_2026_03_14"] == DagRunState.SUCCESS
+ assert states["non_partitioned"] == DagRunState.SUCCESS
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_clears_by_partition_key(self, parser):
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-key",
+ "2026-03-10T00:00:00",
+ "--yes",
+ ]
+ )
+ dag_command.dag_clear(args)
+
+ states = self._get_run_states()
+ assert states["part_2026_03_08"] == DagRunState.SUCCESS
+ assert states["part_2026_03_10"] == DagRunState.QUEUED
+ assert states["part_2026_03_14"] == DagRunState.SUCCESS
+ assert states["non_partitioned"] == DagRunState.SUCCESS
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_run_id_not_found_is_a_no_op(self, parser, capsys):
+ args = parser.parse_args(["dags", "clear", self.DAG_ID, "--run-id",
"does_not_exist", "--yes"])
+ dag_command.dag_clear(args)
+ assert "No matching Dag runs" in capsys.readouterr().out
+ assert self._get_run_states()["part_2026_03_10"] == DagRunState.FAILED
+
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_only_failed_skips_non_failed_task_instances(self, parser):
+ # Explicitly set TI states so we can assert selectively.
+ # part_2026_03_10 has a FAILED DagRun; mark its single TI as FAILED.
+ # part_2026_03_08 has a SUCCESS DagRun; mark its single TI as SUCCESS.
+ with create_session() as session:
+ for run_id, ti_state in [
+ ("part_2026_03_08", TaskInstanceState.SUCCESS),
+ ("part_2026_03_10", TaskInstanceState.FAILED),
+ ]:
+ session.execute(
+ TaskInstance.__table__.update()
+ .where(TaskInstance.dag_id == self.DAG_ID)
+ .where(TaskInstance.run_id == run_id)
+ .values(state=ti_state)
+ )
+
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ self.DAG_ID,
+ "--partition-date-start",
+ "2026-03-08T00:00:00",
+ "--partition-date-end",
+ "2026-03-14T00:00:00",
+ "--only-failed",
+ "--yes",
+ ]
+ )
+ dag_command.dag_clear(args)
+
+ states = self._get_run_states()
+ # part_2026_03_10 had a FAILED TI — its run should be re-queued.
+ assert states["part_2026_03_10"] == DagRunState.QUEUED
+ # part_2026_03_08 had no FAILED TI — its run state must be unchanged.
+ assert states["part_2026_03_08"] == DagRunState.SUCCESS
+ # Non-partitioned run is always untouched.
+ assert states["non_partitioned"] == DagRunState.SUCCESS
+
+ def test_missing_dag_raises(self, parser):
+ args = parser.parse_args(
+ [
+ "dags",
+ "clear",
+ "does_not_exist",
+ "--partition-date-start",
+ "2026-03-08T00:00:00",
+ "--yes",
+ ]
+ )
+ with pytest.raises(AirflowException, match="could not be found in the
database"):
+ dag_command.dag_clear(args)
+
+
class TestDagDetailsIsBackfillable:
"""Tests for the is_backfillable computation in _get_dagbag_dag_details."""