gdevanla opened a new issue #8691:
URL: https://github.com/apache/airflow/issues/8691
**Apache Airflow version**:
1.10.3
**Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
**Environment**:
Python 3.7.4
- **Cloud provider or hardware configuration**:
- **OS** (e.g. from /etc/os-release):
Ubuntu xenial/bionic
- **Kernel** (e.g. `uname -a`):
Linux 4.15.0-45-generic #48~16.04.1-Ubuntu SMP Tue Jan 29 18:03:48 UTC 2019
x86_64 x86_64 x86_64 GNU/Linux
- **Install tools**:
- **Others**:
**What happened**:
The task_instance gets stuck in `scheduled` state because of inconsistency
in expectations of how queued_tasks (that have failed to be queued successfully
in CeleryExecutor) are to be handled.
Given a TaskInstance, `TI`, whose state is `None`, the following psuedo code
is executed inside the 'scheduler_loop'. In this process, the `TI`, in some
situations gets stuck in `scheduled` state
(The indentations below depicts the call-stack)
```
Given, a task_instance `TI`, in `state == None`,
execute_helper (scheduler loop)
(first iteration of the scheduler loop)
- calls `_execute_task_instances`
- calls `_find_executable_task_instances()` that returns `TI` that
has state == `None`
- calls `_change_state_for_executable_task_instances` that updates
`TI`s state = `queued`
- calls `_enqueue_task_instances_with_queued_state`. This function
adds `TI` to `Executor.queued_tasks` dictionary.
- calls `CeleryExecutor.heartbeat`
Tries to `send_task to worker`. If this succeeds, the `TI` is popped
from `CeleryExecutor.queued_tasks`. But in our scenario, `CeleryExecutor`, just
leaves the entry in`queued_tasks` intact beause either `Exception` was raised
or `result` was `None`. The `CeleryExecutor` assumes the scheduler will handle
this scenario. This is where the problem starts.(see second iteration below)
(The link to this code is provided below)
- calls `_change_state_for_tasks_failed_to_execute`.
This function notices that the `TI` entry in
`CeleryExecutor.queued_tasks`, and assumes something went wrong and therefore
correctly updates status of `TI` back to `scheduled`. Note, that the entry of
`TI` still is in the `queued_tasks` and that causes the current issue (see
second iteration below)
- other maintenance activities happen in the scheduler loop (not
relevant to this issue)
(second iteration of the scheduler loop)
- calls `_execute_task_instances`
- calls `_find_executable_task_instances()`.
Now, this function is supposed to return `TI` since it is in
`scheduled` state. But, it finds that an entry for `TI` already exists in
`CeleryExecutor.queued_tasks` and therefore does not return `TI` (refer to link
provided below which point to this case). This means `TI` will never be
`queued` and is stuck in `scheduled` state.
(https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033)
```
The only workaround for this currently, is to restart the scheduler. When
the scheduler is restarted, the `CeleryExecutor.queued_tasks` is reset and
therefore the `TI` instance is `queued` again.
The code where `queue_tasks` entry is updated by poping the TI is here:
https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/executors/celery_executor.py#L223
The code due to which `TI` gets stuck in `scheduled` state is here:
https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033
I think the code here should only check if `CeleryExecutor.running'
dictionary has `TI` in its entries. But, I am not sure how it affects other
schedulers.
**What you expected to happen**:
The `_find_executable_task_instances()` function, should only check if
`CeleryExecutor.running` contains an entry for `TI` and return `TI` as part of
its list of tasks to be queued.
**How to reproduce it**:
It can be reproduced by forcing the `result` value in
`CeleryExecutor.heartbeat` to return an `ExceptionTraceback' object or `None`.
(Note: Links point to `master` branch. But, the problem applies to 1.10.3
and higher versions)
**Anything else we need to know**:
I am not able to see a scenario where in `CeleryExecutor.heartbeart` the
`result` is `None`. Since, looking at the `Celery.app` module, it feels like
the `result` can never be done. But, I suspect there are scenario's where the
`result` is None and therefore the `CeleryExecutor` does not pop the `TI` from
the queue. I am not able to prove this concretely.
This also happens with later version's of Airflow. In the later version's of
airflow, the `CeleryExecutor.trigger_dags' functions is performing the same set
of operations. The code has been moved around between different versions but
the logic remains the same and the problem exists in later versions as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]