zhenghanyang opened a new issue, #37059:
URL: https://github.com/apache/airflow/issues/37059
### Apache Airflow version
2.8.1
### If "Other Airflow 2 version" selected, which one?
_No response_
### What happened?
When start more than 20 thousands trigger jobs, the triggerer may report
following error and shutdown.
```
[2024-01-28T21:14:50.981+0800] {triggerer_job_runner.py:481} INFO - 20373
triggers currently running
[2024-01-28T21:14:51.451+0800] {triggerer_job_runner.py:576} INFO -
Triggerer's async thread was blocked for 0.23 seconds, likely by a
badly-written trigger. Set PYTHONASYNCIODEBUG=1 to get more information on
overrunning coroutines.
[2024-01-28T21:14:53.754+0800] {triggerer_job_runner.py:598} INFO - trigger
test_trigger_job_03/scheduled__2024-01-28T12:10:00+00:00/wait_for_file_zhy685/-1/1
(ID 20384) starting
[2024-01-28T21:14:53.754+0800] {triggerer_job_runner.py:598} INFO - trigger
test_trigger_job_03/scheduled__2024-01-28T12:10:00+00:00/wait_for_file_zhy662/-1/1
(ID 20385) starting
[2024-01-28T21:14:53.773+0800] {triggerer_job_runner.py:341} ERROR -
Exception when executing TriggererJobRunner._run_trigger_loop
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 339, in _execute
self._run_trigger_loop()
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 362, in _run_trigger_loop
self.load_triggers()
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 377, in load_triggers
self.trigger_runner.update_triggers(set(ids))
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 650, in update_triggers
running_trigger_ids.union(x[0] for x in self.events)
File
"/home/airflow/.local/lib/python3.9/site-packages/airflow/jobs/triggerer_job_runner.py",
line 652, in <genexpr>
.union(x[0] for x in self.to_create)
RuntimeError: deque mutated during iteration
[2024-01-28T21:14:53.773+0800] {triggerer_job_runner.py:344} INFO - Waiting
for triggers to clean up
```
### What you think should happen instead?
Error message shows that to_create deque has been mutated while iterating.
The code in error message is in triggerer main thread _run_trigger_loop(),
main thread iterate to_create queue and add new trigger to queue, that works
fine. But, there is another thread TriggerRunner, which create trigger task
from the same queue.
Also find that the queue iterate is add by
https://github.com/apache/airflow/commit/16b8c476518ed76e3689966ec4b0b788be935410#diff-a5a8f5ab18cc034fa7a64181443415a1cd86aca1ce43a3aaa39e73262941118b
### How to reproduce
Just run large numbers of trigger job.
### Operating System
Redhat 8.9
### Versions of Apache Airflow Providers
_No response_
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else?
Maybe we can iterate a copy of queue?
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]