dkranchii opened a new pull request, #68241:
URL: https://github.com/apache/airflow/pull/68241

   ## Summary
   
   `SchedulerJobRunner._remove_unreferenced_triggers` runs as a synchronous 
`call_regular_interval` callback every `parsing_cleanup_interval` (default 60s) 
and issued a single unbounded `DELETE FROM trigger WHERE NOT EXISTS (...)` with 
`synchronize_session="fetch"`. On busy installs that holds row locks on 
`trigger` for the entire transaction — blocking the triggerer's own 
`Trigger.clean_unused` and `submit_event` writers — and stalls the scheduler 
main loop while many rows are removed. The `synchronize_session="fetch"` also 
forces a `SELECT id` over every match before the `DELETE`, materialising every 
row in Python memory.
   
   This PR switches the cleanup to a **select-IDs-with-LIMIT + delete-by-IDs + 
commit-between-batches** loop, mirroring the established pattern in 
`airflow.utils.db_cleanup._do_delete` and `airflow.state.metastore.cleanup`. 
Batch size is tunable via the new `[scheduler] 
unreferenced_triggers_cleanup_batch_size` option (default `500`); set to `0` to 
restore the previous single-statement behaviour.
   
   This is exactly the case the recently added [AGENTS.md rule about batched 
bulk DELETE/UPDATE in scheduler interval 
callbacks](https://github.com/apache/airflow/blob/main/AGENTS.md#coding-standards)
 calls out.
   
   ## Changes
   
   - `airflow-core/src/airflow/jobs/scheduler_job_runner.py` — 
`_remove_unreferenced_triggers` rewritten as a LIMIT-bounded loop with a commit 
per batch. `synchronize_session="fetch"` dropped (we now select the IDs 
explicitly).
   - `airflow-core/src/airflow/config_templates/config.yml` — new `[scheduler] 
unreferenced_triggers_cleanup_batch_size` (default `500`, `version_added: 
3.3.0`).
   - `airflow-core/tests/unit/jobs/test_scheduler_job.py` — new 
`test_remove_unreferenced_triggers_batches_deletes` asserts batching: 5 
unreferenced triggers + batch size 2 → 3 commits.
   
   The supporting indexes on the filter columns already exist 
(`task_instance.ti_trigger_id`, `asset_watcher.idx_awm_trigger_id`).
   
   ## Follow-up
   
   The parallel `Trigger.clean_unused()` path in the triggerer loop has the 
same anti-pattern and will get the same treatment in a follow-up; tracked at 
<PASTE_TRACKING_ISSUE_URL>.
   
   ## Test plan
   
   - [x] New unit test `test_remove_unreferenced_triggers_batches_deletes` 
covers the batching contract.
   - [x] Existing `test_delete_unreferenced_triggers` and 
`test_delete_unreferenced_triggers_with_null_trigger_id_ti` keep passing 
(semantics unchanged at default batch size).
   - [ ] Local validation will be done on a separate machine; relying on CI 
here for the broader matrix.
   
   ---
   
   ##### Was generative AI tooling used to co-author this PR?
   
   - [X] Yes — Claude
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to