Lee-W commented on code in PR #68702:
URL: https://github.com/apache/airflow/pull/68702#discussion_r3453607619
##########
airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py:
##########
@@ -150,6 +155,91 @@ def perform_clear_dag_run(
return dag_run_cleared
+_TI_CHUNK_SIZE = 500
+
+
+def clear_partition_fields(
+ *,
+ dag: SerializedDAG,
+ body: ClearPartitionsBody,
+ dag_id: str,
+ session: Session,
+) -> tuple[int, int]:
+ """
+ Reset partition_key and partition_date to None on matching runs.
+
+ Returns (dag_runs_cleared, task_instances_cleared).
+ Mirrors ``airflow partitions clear`` column-reset behavior.
+ """
+ stmt = select(DagRun).where(DagRun.dag_id == dag_id)
+ if body.run_id is not None:
+ stmt = stmt.where(DagRun.run_id == body.run_id)
+ elif body.partition_key is not None:
+ stmt = stmt.where(DagRun.partition_key == body.partition_key)
+ else:
+ stmt = stmt.where(or_(DagRun.partition_key.is_not(None),
DagRun.partition_date.is_not(None)))
+ if body.partition_date_start is not None:
+ lower =
dag.timetable.resolve_day_bound(body.partition_date_start.date())
+ stmt = stmt.where(DagRun.partition_date >= lower)
+ if body.partition_date_end is not None:
+ upper =
dag.timetable.resolve_day_bound(body.partition_date_end.date() +
timedelta(days=1))
+ stmt = stmt.where(DagRun.partition_date < upper)
Review Comment:
It is now `DagRun.apply_partition_date_window`, shared by each component.
The service and CLI also share a single `DagRun.clear_partition_runs` core now,
so the two implementations won't drift
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]