Lee-W commented on code in PR #66004:
URL: https://github.com/apache/airflow/pull/66004#discussion_r3298270937


##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -117,6 +122,70 @@ def dag_delete(args) -> None:
         print("Cancelled")
 
 
+@cli_utils.action_cli
+@providers_configuration_loaded
+@provide_session
+def dag_clear(args, session: Session = NEW_SESSION) -> None:
+    """Clear Dag runs selected by run_id, partition_key, or a partition_date 
window."""
+    has_range = args.partition_date_start is not None or 
args.partition_date_end is not None
+    selectors_used = sum([args.run_id is not None, args.partition_key is not 
None, has_range])
+    if selectors_used == 0:
+        raise SystemExit(
+            "One of --run-id, --partition-key, or --partition-date-start / 
--partition-date-end "
+            "must be provided."
+        )
+    if selectors_used > 1:
+        raise SystemExit(
+            "--run-id, --partition-key, and --partition-date-start / 
--partition-date-end are "
+            "mutually exclusive; provide exactly one selector."
+        )
+    if (
+        args.partition_date_start is not None
+        and args.partition_date_end is not None
+        and args.partition_date_start > args.partition_date_end
+    ):
+        raise SystemExit("--partition-date-start must be on or before 
--partition-date-end.")
+
+    dag = get_db_dag(bundle_names=None, dag_id=args.dag_id)
+
+    query = select(DagRun).where(DagRun.dag_id == args.dag_id)
+    if args.run_id is not None:
+        query = query.where(DagRun.run_id == args.run_id)
+    elif args.partition_key is not None:
+        query = query.where(DagRun.partition_key == args.partition_key)
+    else:
+        query = query.where(DagRun.partition_date.is_not(None))
+        if args.partition_date_start is not None:
+            query = query.where(DagRun.partition_date >= 
args.partition_date_start)
+        if args.partition_date_end is not None:
+            query = query.where(DagRun.partition_date <= 
args.partition_date_end)
+    query = query.order_by(DagRun.partition_date, DagRun.run_id)
+
+    runs = list(session.scalars(query).all())
+    if not runs:
+        print("No matching Dag runs found.")
+        return
+
+    run_ids = [run.run_id for run in runs]
+    if not args.yes:
+        listing = "\n".join(
+            f"  {run.run_id}  partition_key={run.partition_key}  
partition_date={run.partition_date}"
+            for run in runs
+        )
+        question = (
+            f"You are about to clear {len(runs)} Dag run(s) of 
{args.dag_id!r}:\n"
+            f"{listing}\n\nAre you sure? [y/n]"
+        )
+        if not ask_yesno(question):
+            print("Cancelled, nothing was cleared.")
+            return
+
+    cleared = 0
+    for run_id in run_ids:
+        cleared += dag.clear(run_id=run_id, session=session)

Review Comment:
   I'm ok with creating it. but yep, not scope of this PR. will create one and 
paste it here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to