[
https://issues.apache.org/jira/browse/AIRFLOW-1143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986743#comment-15986743
]
Gerard Toonstra commented on AIRFLOW-1143:
------------------------------------------
The scheduler code and interaction between workers, task instances getting
executed by them is a really interesting and complex piece of code.
The heartbeats by the way do not check if tasks are still really executing, it
just ensures that processes get started and results are collected. From the
perspective of the executor, there are two consequences: success or fail, which
is determined by the return value of the called process.
This means that a return or an exception thrown in the
models.TaskInstance.run() method eventually determines whether something is
considered failing or succeeding. Within that method, the worker has a
connection to the database and the method should from that perspective set the
task instance state correctly, otherwise the task instance gets stuck. Celery
doesn't have capabilities I believe to verify some task is still running
(anywhere) or not.
What happens here is that the run() method started ok, but there are two checks
for additional dependency checks, which could fail. One of them changes the
state to NONE, which I believe would get the task instance to be rerun for an
active dagrun in the _process_task_instances method of the SchedulerJob.py. The
first QUEUE_DEPS check however does not set this state, so the state of this
task would remain in QUEUED.
In the reaping process when the scheduler restarts, it checks the task instance
key against the executor.tasks_queued list (which is then empty), so then it
picks up the task instance and sets the state to NONE there. This is only
called at the start of the scheduler, because otherwise queued and scheduled
tasks would get removed.
Sounds like setting the state to NONE in the TaskInstance.run() method after
the queue_dep_context check fails could solve this in the quickest way.
The TaskInstance.run() is responsible for setting the final state on the TI in
the database, but there is a check in the SchedulerJob if the recorded state
matches the output state from the executor deployment (in
_manage_executor_state). You'd then see this in the log : " Executor reports
task instance {} finished ({}) "
"although the task says its {}."
For the completeness of this JIRA, it would also be helpful if you indicate
seeing this or not:
FIXME: Rescheduling due to concurrency limits
Time for others to shine light on what I just said...
> Tasks rejected by workers get stuck in QUEUED
> ---------------------------------------------
>
> Key: AIRFLOW-1143
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1143
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Reporter: Dan Davydov
> Assignee: Gerard Toonstra
>
> If the scheduler schedules a task that is sent to a worker that then rejects
> the task (e.g. because one of the dependencies of the tasks became bad, like
> the pool became full), the task will be stuck in the QUEUED state. We hit
> this trying to switch from invoking the scheduler "airflow scheduler -n 5" to
> just "airflow scheduler".
> Restarting the scheduler fixes this because it cleans up orphans, but we
> shouldn't have to restart the scheduler to fix these problems (the missing
> job heartbeats should make the scheduler requeue the task).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)