Lee-W commented on code in PR #68280:
URL: https://github.com/apache/airflow/pull/68280#discussion_r3394672641
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -195,15 +199,47 @@ def dag_clear(args, *, session: Session = NEW_SESSION) ->
None:
print("Cancelled, nothing was cleared.")
return
+ cleared = _bulk_clear_runs(
+ args.dag_id,
+ run_ids,
+ only_failed=args.only_failed,
+ only_running=args.only_running,
+ session=session,
+ )
+ print(f"Cleared {cleared} task instance(s) across {len(run_ids)} Dag
run(s).")
+
+
+def _bulk_clear_runs(
+ dag_id: str,
+ run_ids: list[str],
+ only_failed: bool,
+ only_running: bool,
+ session: Session,
+) -> int:
+ """Clear task instances for the given run_ids in chunks instead of one
transaction per run."""
+ state_filter: list = []
+ if only_failed:
+ state_filter += [TaskInstanceState.FAILED,
TaskInstanceState.UPSTREAM_FAILED]
+ if only_running:
+ state_filter += [TaskInstanceState.RUNNING]
+
cleared = 0
- for run_id in run_ids:
- cleared += dag.clear(
- run_id=run_id,
- only_failed=args.only_failed,
- only_running=args.only_running,
- session=session,
+ for chunk_start in range(0, len(run_ids), _RUN_CHUNK_SIZE):
Review Comment:
nit: We can reuse `airflow.utils.helpers.chunks(run_ids, _RUN_CHUNK_SIZE)`
##########
airflow-core/tests/unit/cli/commands/test_dag_command.py:
##########
@@ -1959,6 +1966,143 @@ def test_asset_timetable_upper_bound_over_cap(self,
parser):
assert states["asset_2026_04_15"] == DagRunState.SUCCESS
assert states["asset_non_part"] == DagRunState.SUCCESS
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_clears_multiple_runs_in_one_batch(self, parser):
+ """3 runs fit in one chunk, so clear_task_instances is called once
(not N times)."""
+ from airflow.models.taskinstance import clear_task_instances
Review Comment:
let's also move it to top level
##########
airflow-core/src/airflow/cli/commands/dag_command.py:
##########
@@ -195,15 +199,47 @@ def dag_clear(args, *, session: Session = NEW_SESSION) ->
None:
print("Cancelled, nothing was cleared.")
return
+ cleared = _bulk_clear_runs(
+ args.dag_id,
+ run_ids,
+ only_failed=args.only_failed,
+ only_running=args.only_running,
+ session=session,
+ )
+ print(f"Cleared {cleared} task instance(s) across {len(run_ids)} Dag
run(s).")
+
+
+def _bulk_clear_runs(
+ dag_id: str,
+ run_ids: list[str],
+ only_failed: bool,
+ only_running: bool,
+ session: Session,
+) -> int:
+ """Clear task instances for the given run_ids in chunks instead of one
transaction per run."""
+ state_filter: list = []
Review Comment:
```suggestion
state_filter: list[TaskInstanceState] = []
```
##########
airflow-core/tests/unit/cli/commands/test_dag_command.py:
##########
@@ -1959,6 +1966,143 @@ def test_asset_timetable_upper_bound_over_cap(self,
parser):
assert states["asset_2026_04_15"] == DagRunState.SUCCESS
assert states["asset_non_part"] == DagRunState.SUCCESS
+ @pytest.mark.usefixtures("seeded_partitioned_runs")
+ def test_clears_multiple_runs_in_one_batch(self, parser):
Review Comment:
I feel we can merge the next test to this one and do chunk, state, and
clear_numbers check at the same time.
chunk check can be done through parameterize
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]