V0lantis commented on issue #21225:
URL: https://github.com/apache/airflow/issues/21225#issuecomment-1229250816
Hello there 👋
We experienced the same issue in our deployment cluster, and after a lot of
searching, I think I have found why it is happening as some reproductible steps
to provoke it. I am very willing to submit a PR if I may.
First off, we experienced this issue very rarely on our deployment cluster.
Our deployment cluster is right now on **AWS EKS**, Airflow version
**2.3.3**, with **redis** message broker and **CeleryExecutor** of course :).
After a lot of searching, I found out that the queued task (or the queued
tasks, I don't know how many we had because we have 1000+ dags and even more
tasks), which didn't want to get executed appeared when our AWS autoscaler was
doing a downscaling of our pod. At that point, when I looked at the two pods
celery workers which were downscaled, I saw a a log message in the pod :
```
worker: Warm shutdown
```
this log message appeared a couple of milliseconds after the scheduler has
sent the task to redis.
What if, the worker had consumed this message, but in the meantime, celery
shutdown the consumer loop, and the message never get properly executed.
**BUT**, since the message has been consumed, (LPOP), redis didn't have the
message in its queue, but the Celery Executor still had it ! That's why we have
the log (after recleaning the task):
```
[2022-08-27 21:15:40,100] {base_executor.py:93} INFO - Adding to queue:
['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this',
'manual__2022-08-27T19:13:04.614822+00:00', '--local', '--subdir',
'/Users/arthurvolant/dev/python/airflow/airflow/example_dags/example_bash_operator.py']
[2022-08-27 21:15:40,101] {base_executor.py:213} INFO - task
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this',
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1)
is still running
[2022-08-27 21:15:40,205] {base_executor.py:213} INFO - task
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this',
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1)
is still running
[2022-08-27 21:15:41,292] {base_executor.py:213} INFO - task
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this',
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1)
is still running
[2022-08-27 21:15:42,391] {base_executor.py:213} INFO - task
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this',
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1)
is still running
[2022-08-27 21:15:43,498] {base_executor.py:217} ERROR - could not queue
task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this',
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1)
(still running after 4 attempts)
```
I guess, that's why there is a `visibility_timeout`, but unfortunately, ours
is 25hours and some of our tasks cannot be suffer any delay (maximum half an
our). Anyway, here are the steps to reproduce it :
1. Start a basic cluster / docker dev env (it still works on the latest
commit, Airflow **2.3.4.dev** `breeze start-airflow -p 3.10 -b postgres
--integration redis --db-reset`
2. Start a dag (In my example, I am starting _example_bash_operator_.
3. Tasks will be scheduled and put into queue by the CeleryExecutor
4. Connect to redis and LPOP one or more messages : `docker exec -it
docker-compose_redis_1 redis-cli` and `LPOP default`
5. Activate the celeryWorker.
6. Boom 💥 . All the tasks which are still in queued are run, except the
one(s) you popped out of the queue.
I will try to think of something. I have some ideas and submit a PR as soon
as possible since this is very stressful for our deployment.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]