hterik opened a new issue, #35843:
URL: https://github.com/apache/airflow/issues/35843

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### What happened
   
   Task set to run in celery queue was queued but never started on the worker.
   Stuck in state PENDING even if celery queue had spare capacity.
   ```
   [2023-11-24T11:41:49.714+0000] {scheduler_job_runner.py:414} INFO - 1 tasks 
up for execution:
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
   [2023-11-24T11:41:49.714+0000] {scheduler_job_runner.py:593} INFO - Setting 
the following tasks to queued state:
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
   [2023-11-24T11:41:49.716+0000] {scheduler_job_runner.py:636} INFO - Sending 
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID', 
try_number=1, map_index=-1) to executor with priority 1 and queue MYQUEUE
   [2023-11-24T11:41:49.716+0000] {base_executor.py:144} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'MYDAGID', 'MYTASKID', 'MYRUNID', '--local', 
'--subdir', 'DAGS_FOLDER/dag_xxx.py']
   [2023-11-24T11:41:49.766+0000] {scheduler_job_runner.py:686} INFO - Received 
executor event with state queued for task instance 
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID', 
try_number=1, map_index=-1)
   [2023-11-24T11:41:49.769+0000] {scheduler_job_runner.py:713} INFO - Setting 
external_id for <TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> to 
5502f29e-4f17-40e3-b91e-519c3fb4606b
   
   [2023-11-24T14:21:45.680+0000] {celery_executor.py:439} INFO - Adopted the 
following 22 tasks from a dead executor
       ...
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> in state PENDING
   [2023-11-24T14:48:24.426+0000] {celery_executor.py:439} INFO - Adopted the 
following 25 tasks from a dead executor
       ...
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> in state PENDING
   ```
   
   At this point, if i introspect the Celery queue, it does not contain any 
item called 5502f29e-4f17-40e3-b91e-519c3fb4606b
   
   After clicking on **Clear task** in Airflow UI, scheduler continues failing 
to schedule the task like follows:
   ```
   [2023-11-24T15:44:58.737+0000] {scheduler_job_runner.py:414} INFO - 1 tasks 
up for execution:
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
   [2023-11-24T15:44:58.738+0000] {scheduler_job_runner.py:593} INFO - Setting 
the following tasks to queued state:
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
   [2023-11-24T15:44:58.739+0000] {scheduler_job_runner.py:636} INFO - Sending 
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID', 
try_number=1, map_index=-1) to executor with priority 1 and queue MYQUEUE
   [2023-11-24T15:44:58.739+0000] {base_executor.py:144} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'MYDAGID', 'MYTASKID', 'MYRUNID', '--local', 
'--subdir', 'DAGS_FOLDER/dag_xxx.py']
   
   [2023-11-24T15:44:58.741+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=1 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:44:59.045+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=2 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:44:59.368+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=3 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:00.669+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=4 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:01.161+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=5 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:02.069+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=6 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:03.339+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=7 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:04.754+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=8 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:06.083+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=9 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:07.378+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=10 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:08.629+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=11 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:09.675+0000] {base_executor.py:277} INFO - queued but 
still running; attempt=12 task=TaskInstanceKey(dag_id='MYDAGID', 
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
   [2023-11-24T15:45:10.983+0000] {base_executor.py:280} ERROR - could not 
queue task TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', 
run_id='MYRUNID', try_number=1, map_index=-1) (still running after 12 attempts)
   ```
   
   I tried **Clear Task** twice and it gave exactly the same result as above 
again.
   At this point, restart airflow-scheduler.
   ```
   [2023-11-24T16:17:57.328+0000] {scheduler_job_runner.py:1650} INFO - Reset 
the following 1 orphaned TaskInstances:
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]>
   [2023-11-24T16:17:57.876+0000] {scheduler_job_runner.py:414} INFO - 2 tasks 
up for execution:
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
       <TaskInstance: autobump_platform_master.first_seen_commit 
autobump_platform_master-412983e6bd [scheduled]>
   [2023-11-24T16:17:57.877+0000] {scheduler_job_runner.py:593} INFO - Setting 
the following tasks to queued state:
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
       <TaskInstance: autobump_platform_master.first_seen_commit 
autobump_platform_master-412983e6bd [scheduled]>
   [2023-11-24T16:17:57.880+0000] {scheduler_job_runner.py:636} INFO - Sending 
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID', 
try_number=1, map_index=-1) to executor with priority 1 and queue MYQUEUE
   [2023-11-24T16:17:57.880+0000] {base_executor.py:144} INFO - Adding to 
queue: ['airflow', 'tasks', 'run', 'MYDAGID', 'MYTASKID', 'MYRUNID', '--local', 
'--subdir', 'DAGS_FOLDER/dag_xxx.py']
   [2023-11-24T16:17:58.377+0000] {scheduler_job_runner.py:686} INFO - Received 
executor event with state queued for task instance 
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID', 
try_number=1, map_index=-1)
   [2023-11-24T16:17:58.387+0000] {scheduler_job_runner.py:713} INFO - Setting 
external_id for <TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> to 
7f00daa5-1ff9-48d9-9d3d-dacf871c27d6
   [2023-11-24T16:19:16.805+0000] {celery_executor.py:439} INFO - Adopted the 
following 18 tasks from a dead executor
       <TaskInstance: MYDAGID.MYTASKID MYRUNID [running]> in state STARTED
        ...
   ```
   Now it works. Notice new external_id was given, 
7f00daa5-1ff9-48d9-9d3d-dacf871c27d6, which i can observe in the celery queue.
   
   
   ### What you think should happen instead
   
   1. Celery tasks should not get stuck 
   2. Clicking Clear task should reschedule stuck tasks more reliably
   3. Restarting airflow-scheduler should not be needed to reschedule stuck 
tasks.
   
   ### How to reproduce
   
   Dunno :(
   
   ### Operating System
   
   Debian GNU/Linux 11 (bullseye)
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow==2.7.1
   apache-airflow-client==2.1.0
   apache-airflow-providers-celery==3.3.3
   apache-airflow-providers-cncf-kubernetes==7.5.0
   
   ### Deployment
   
   Official Apache Airflow Helm Chart
   
   ### Deployment details
   
   _No response_
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to