hterik opened a new issue, #35843:
URL: https://github.com/apache/airflow/issues/35843
### Apache Airflow version
Other Airflow 2 version (please specify below)
### What happened
Task set to run in celery queue was queued but never started on the worker.
Stuck in state PENDING even if celery queue had spare capacity.
```
[2023-11-24T11:41:49.714+0000] {scheduler_job_runner.py:414} INFO - 1 tasks
up for execution:
<TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
[2023-11-24T11:41:49.714+0000] {scheduler_job_runner.py:593} INFO - Setting
the following tasks to queued state:
<TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
[2023-11-24T11:41:49.716+0000] {scheduler_job_runner.py:636} INFO - Sending
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID',
try_number=1, map_index=-1) to executor with priority 1 and queue MYQUEUE
[2023-11-24T11:41:49.716+0000] {base_executor.py:144} INFO - Adding to
queue: ['airflow', 'tasks', 'run', 'MYDAGID', 'MYTASKID', 'MYRUNID', '--local',
'--subdir', 'DAGS_FOLDER/dag_xxx.py']
[2023-11-24T11:41:49.766+0000] {scheduler_job_runner.py:686} INFO - Received
executor event with state queued for task instance
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID',
try_number=1, map_index=-1)
[2023-11-24T11:41:49.769+0000] {scheduler_job_runner.py:713} INFO - Setting
external_id for <TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> to
5502f29e-4f17-40e3-b91e-519c3fb4606b
[2023-11-24T14:21:45.680+0000] {celery_executor.py:439} INFO - Adopted the
following 22 tasks from a dead executor
...
<TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> in state PENDING
[2023-11-24T14:48:24.426+0000] {celery_executor.py:439} INFO - Adopted the
following 25 tasks from a dead executor
...
<TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> in state PENDING
```
At this point, if i introspect the Celery queue, it does not contain any
item called 5502f29e-4f17-40e3-b91e-519c3fb4606b
After clicking on **Clear task** in Airflow UI, scheduler continues failing
to schedule the task like follows:
```
[2023-11-24T15:44:58.737+0000] {scheduler_job_runner.py:414} INFO - 1 tasks
up for execution:
<TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
[2023-11-24T15:44:58.738+0000] {scheduler_job_runner.py:593} INFO - Setting
the following tasks to queued state:
<TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
[2023-11-24T15:44:58.739+0000] {scheduler_job_runner.py:636} INFO - Sending
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID',
try_number=1, map_index=-1) to executor with priority 1 and queue MYQUEUE
[2023-11-24T15:44:58.739+0000] {base_executor.py:144} INFO - Adding to
queue: ['airflow', 'tasks', 'run', 'MYDAGID', 'MYTASKID', 'MYRUNID', '--local',
'--subdir', 'DAGS_FOLDER/dag_xxx.py']
[2023-11-24T15:44:58.741+0000] {base_executor.py:277} INFO - queued but
still running; attempt=1 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:44:59.045+0000] {base_executor.py:277} INFO - queued but
still running; attempt=2 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:44:59.368+0000] {base_executor.py:277} INFO - queued but
still running; attempt=3 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:00.669+0000] {base_executor.py:277} INFO - queued but
still running; attempt=4 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:01.161+0000] {base_executor.py:277} INFO - queued but
still running; attempt=5 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:02.069+0000] {base_executor.py:277} INFO - queued but
still running; attempt=6 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:03.339+0000] {base_executor.py:277} INFO - queued but
still running; attempt=7 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:04.754+0000] {base_executor.py:277} INFO - queued but
still running; attempt=8 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:06.083+0000] {base_executor.py:277} INFO - queued but
still running; attempt=9 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:07.378+0000] {base_executor.py:277} INFO - queued but
still running; attempt=10 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:08.629+0000] {base_executor.py:277} INFO - queued but
still running; attempt=11 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:09.675+0000] {base_executor.py:277} INFO - queued but
still running; attempt=12 task=TaskInstanceKey(dag_id='MYDAGID',
task_id='MYTASKID', run_id='MYRUNID', try_number=1, map_index=-1)
[2023-11-24T15:45:10.983+0000] {base_executor.py:280} ERROR - could not
queue task TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID',
run_id='MYRUNID', try_number=1, map_index=-1) (still running after 12 attempts)
```
I tried **Clear Task** twice and it gave exactly the same result as above
again.
At this point, restart airflow-scheduler.
```
[2023-11-24T16:17:57.328+0000] {scheduler_job_runner.py:1650} INFO - Reset
the following 1 orphaned TaskInstances:
<TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]>
[2023-11-24T16:17:57.876+0000] {scheduler_job_runner.py:414} INFO - 2 tasks
up for execution:
<TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
<TaskInstance: autobump_platform_master.first_seen_commit
autobump_platform_master-412983e6bd [scheduled]>
[2023-11-24T16:17:57.877+0000] {scheduler_job_runner.py:593} INFO - Setting
the following tasks to queued state:
<TaskInstance: MYDAGID.MYTASKID MYRUNID [scheduled]>
<TaskInstance: autobump_platform_master.first_seen_commit
autobump_platform_master-412983e6bd [scheduled]>
[2023-11-24T16:17:57.880+0000] {scheduler_job_runner.py:636} INFO - Sending
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID',
try_number=1, map_index=-1) to executor with priority 1 and queue MYQUEUE
[2023-11-24T16:17:57.880+0000] {base_executor.py:144} INFO - Adding to
queue: ['airflow', 'tasks', 'run', 'MYDAGID', 'MYTASKID', 'MYRUNID', '--local',
'--subdir', 'DAGS_FOLDER/dag_xxx.py']
[2023-11-24T16:17:58.377+0000] {scheduler_job_runner.py:686} INFO - Received
executor event with state queued for task instance
TaskInstanceKey(dag_id='MYDAGID', task_id='MYTASKID', run_id='MYRUNID',
try_number=1, map_index=-1)
[2023-11-24T16:17:58.387+0000] {scheduler_job_runner.py:713} INFO - Setting
external_id for <TaskInstance: MYDAGID.MYTASKID MYRUNID [queued]> to
7f00daa5-1ff9-48d9-9d3d-dacf871c27d6
[2023-11-24T16:19:16.805+0000] {celery_executor.py:439} INFO - Adopted the
following 18 tasks from a dead executor
<TaskInstance: MYDAGID.MYTASKID MYRUNID [running]> in state STARTED
...
```
Now it works. Notice new external_id was given,
7f00daa5-1ff9-48d9-9d3d-dacf871c27d6, which i can observe in the celery queue.
### What you think should happen instead
1. Celery tasks should not get stuck
2. Clicking Clear task should reschedule stuck tasks more reliably
3. Restarting airflow-scheduler should not be needed to reschedule stuck
tasks.
### How to reproduce
Dunno :(
### Operating System
Debian GNU/Linux 11 (bullseye)
### Versions of Apache Airflow Providers
apache-airflow==2.7.1
apache-airflow-client==2.1.0
apache-airflow-providers-celery==3.3.3
apache-airflow-providers-cncf-kubernetes==7.5.0
### Deployment
Official Apache Airflow Helm Chart
### Deployment details
_No response_
### Anything else
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]