[ 
https://issues.apache.org/jira/browse/AIRFLOW-1143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15986743#comment-15986743
 ] 

Gerard Toonstra commented on AIRFLOW-1143:
------------------------------------------

The scheduler code and interaction between workers, task instances getting 
executed by them is a really interesting and complex piece of code. 

The heartbeats by the way do not check if tasks are still really executing, it 
just ensures that processes get started and results are collected. From the 
perspective of the executor, there are two consequences: success or fail, which 
is determined by the return value of the called process.

This means that a return or an exception thrown in the 
models.TaskInstance.run() method eventually determines whether something is 
considered failing or succeeding. Within that method, the worker has a 
connection to the database and the method should from that perspective set the 
task instance state correctly, otherwise the task instance gets stuck. Celery 
doesn't have capabilities I believe to verify some task is still running 
(anywhere) or not. 

What happens here is that the run() method started ok, but there are two checks 
for additional dependency checks, which could fail. One of them changes the 
state to NONE, which I believe would get the task instance to be rerun for an 
active dagrun in the _process_task_instances method of the SchedulerJob.py. The 
first QUEUE_DEPS check however does not set this state, so the state of this 
task would remain in QUEUED. 

In the reaping process when the scheduler restarts, it checks the task instance 
key against the executor.tasks_queued list (which is then empty), so then it 
picks up the task instance and sets the state to NONE there. This is only 
called at the start of the scheduler, because otherwise queued and scheduled 
tasks would get removed.

Sounds like setting the state to NONE in the TaskInstance.run() method after 
the queue_dep_context check fails could solve this in the quickest way.

The TaskInstance.run() is responsible for setting the final state on the TI in 
the database, but there is a check in the SchedulerJob if the recorded state 
matches the output state from the executor deployment (in 
_manage_executor_state). You'd then see this in the log : " Executor reports 
task instance {} finished ({}) "
                           "although the task says its {}." 

For the completeness of this JIRA, it would also be helpful if you indicate 
seeing this or not: 

FIXME: Rescheduling due to concurrency limits

Time for others to shine light on what I just said...

> Tasks rejected by workers get stuck in QUEUED
> ---------------------------------------------
>
>                 Key: AIRFLOW-1143
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1143
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>            Reporter: Dan Davydov
>            Assignee: Gerard Toonstra
>
> If the scheduler schedules a task that is sent to a worker that then rejects 
> the task (e.g. because one of the dependencies of the tasks became bad, like 
> the pool became full), the task will be stuck in the QUEUED state. We hit 
> this trying to switch from invoking the scheduler "airflow scheduler -n 5" to 
> just "airflow scheduler".
> Restarting the scheduler fixes this because it cleans up orphans, but we 
> shouldn't have to restart the scheduler to fix these problems (the missing 
> job heartbeats should make the scheduler requeue the task).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to