V0lantis commented on issue #21225:
URL: https://github.com/apache/airflow/issues/21225#issuecomment-1229250816

   Hello there 👋 
   
   We experienced the same issue in our deployment cluster, and after a lot of 
searching, I think I have found why it is happening as some reproductible steps 
to provoke it. I am very willing to submit a PR if I may.
   
   First off, we experienced this issue very rarely on our deployment cluster. 
   Our deployment cluster is right now on **AWS EKS**, Airflow version 
**2.3.3**, with **redis** message broker and **CeleryExecutor** of course :). 
   After a lot of searching, I found out that the queued task (or the queued 
tasks, I don't know how many we had because we have 1000+ dags and even more 
tasks), which didn't want to get executed appeared when our AWS autoscaler was 
doing a downscaling of our pod. At that point, when I looked at the two pods 
celery workers which were downscaled, I saw a a log message in the pod : 
   
   ```
   worker: Warm shutdown
   ```
   this log message appeared a couple of milliseconds after the scheduler has 
sent the task to redis.
   What if, the worker had consumed this message, but in the meantime, celery 
shutdown the consumer loop, and the message never get properly executed. 
**BUT**, since the message has been consumed, (LPOP), redis didn't have the 
message in its queue, but the Celery Executor still had it ! That's why we have 
the log (after recleaning the task):
   
   ```
   [2022-08-27 21:15:40,100] {base_executor.py:93} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'example_bash_operator', 'also_run_this', 
'manual__2022-08-27T19:13:04.614822+00:00', '--local', '--subdir', 
'/Users/arthurvolant/dev/python/airflow/airflow/example_dags/example_bash_operator.py']
   [2022-08-27 21:15:40,101] {base_executor.py:213} INFO - task 
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', 
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) 
is still running
   [2022-08-27 21:15:40,205] {base_executor.py:213} INFO - task 
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', 
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) 
is still running
   [2022-08-27 21:15:41,292] {base_executor.py:213} INFO - task 
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', 
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) 
is still running
   [2022-08-27 21:15:42,391] {base_executor.py:213} INFO - task 
TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', 
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) 
is still running
   [2022-08-27 21:15:43,498] {base_executor.py:217} ERROR - could not queue 
task TaskInstanceKey(dag_id='example_bash_operator', task_id='also_run_this', 
run_id='manual__2022-08-27T19:13:04.614822+00:00', try_number=1, map_index=-1) 
(still running after 4 attempts)
   ```
   
   I guess, that's why there is a `visibility_timeout`, but unfortunately, ours 
is 25hours and some of our tasks cannot be suffer any delay (maximum half an 
our). Anyway, here are the steps to reproduce it :
   
   1. Start a basic cluster / docker dev env (it still works on the latest 
commit, Airflow **2.3.4.dev** `breeze start-airflow -p 3.10 -b postgres  
--integration redis --db-reset`
   2. Start a dag (In my example, I am starting _example_bash_operator_.
   3. Tasks will be scheduled and put into queue by the CeleryExecutor
   4. Connect to redis and LPOP one or more messages : `docker exec -it 
docker-compose_redis_1 redis-cli` and `LPOP default`
   5. Activate the celeryWorker.
   6. Boom 💥 . All the tasks which are still in queued are run, except the 
one(s) you popped out of the queue.
   
   I will try to think of something. I have some ideas and submit a PR as soon 
as possible since this is very stressful for our deployment.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to