RNHTTR opened a new issue, #28120:
URL: https://github.com/apache/airflow/issues/28120
> Note: This has cropped up in at least 2.3.x and remains in 2.4.3.
The links to Airflow source code are from the 2.3.1 release.
It seems what’s happening is the `airflow tasks run <task>` command is
failing on the Celery worker:
```
airflow.exceptions.AirflowException: Celery command failed on host: <host>
with celery_task_id 20ec4a6d-21b4-4838-b7f3-fb5d52c538ee
```
The Celery status is set to failed , but the task in Airflow remains in
queued for some arbitrary amount of time (often hours):
```
{scheduler_job.py:599} INFO - Executor reports execution of <task>
run_id=scheduled__2022-10-26T23:00:00+00:00 exited with status failed for
try_number 1
{scheduler_job.py:642} INFO - TaskInstance Finished: dag_id=<dag id>,
task_id=<task id>, run_id=scheduled__2022-10-26T23:00:00+00:00, map_index=-1,
run_start_date=None, run_end_date=None, run_duration=None, state=queued,
executor_state=failed, try_number=1, max_tries=2, job_id=None,
pool=default_pool, queue=default, priority_weight=3, operator=DummyOperator,
queued_dttm=2022-10-27 00:09:00.545894+00:00, queued_by_job_id=2664047, pid=None
```
Note the `state=queued` and `executor_state=failed` -- Airflow should be
marking the task as failed. When this happens, these tasks also bypass
`stalled_task_timeout`, because when
[`update_task_state`](https://github.com/apache/airflow/blob/2.3.1/airflow/executors/celery_executor.py#L456-L465)
is called, the celery state is `STARTED`.
`self._set_celery_pending_task_timeout(key, None)` removes the task from the
list of tasks eligible for `stalled_task_timeout`, and so these tasks remain in
queued indefinitely.
___
Summary of what's happening:
1. [CeleryExecutor’s `update_task_state` method calls
`fail()`](https://github.com/apache/airflow/blob/2.3.1/airflow/executors/celery_executor.py#L455),
which is a method from BaseExecutor.
2.
[BaseExecutor's](https://github.com/apache/airflow/blob/ee100a592e07761f3b32294f3ad82c4a6c7cf74d/airflow/executors/base_executor.py#L245)
`fail` calls CeleryExecutor’s `change_state` method.
3. [CeleryExecutor’s `change_state`
method](https://github.com/apache/airflow/blob/ee100a592e07761f3b32294f3ad82c4a6c7cf74d/airflow/executors/celery_executor.py#L450)
calls BaseExecutor’s `change_state` method via `super()`
4. The crux: [BaseExecutor’s `change_state` method
](https://github.com/apache/airflow/blob/ee100a592e07761f3b32294f3ad82c4a6c7cf74d/airflow/executors/base_executor.py#L230)
is as follows:
```
self.log.debug("Changing state: %s", key)
try:
self.running.remove(key)
except KeyError:
self.log.debug('Could not find key: %s', str(key))
```
Because the `airflow tasks run` command failed, the task is never set to the
running state. The `except KeyError` block allows the code to continue
unabated. Once BaseExecutor’s `change_state `method completes,
[CeleryExecutor’s `change_state` method
completes](https://github.com/apache/airflow/blob/ee100a592e07761f3b32294f3ad82c4a6c7cf74d/airflow/executors/celery_executor.py#L450):
```
def change_state(self, key: TaskInstanceKey, state: str, info=None) -> None:
super().change_state(key, state, info)
self.tasks.pop(key, None)
self._set_celery_pending_task_timeout(key, None)
```
`self._set_celery_pending_task_timeout(key, None)` removes the task from the
list of tasks that `stalled_task_timeout` checks for, allowing the tasks to
remain in queued indefinitely.
Instead, when the `airflow tasks run` command fails, the Airflow task
instance should be failed or retried (if applicable).
_Originally posted by @RNHTTR in
https://github.com/apache/airflow/discussions/28022_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]