[ 
https://issues.apache.org/jira/browse/AIRFLOW-1463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16105327#comment-16105327
 ] 

George Leslie-Waksman commented on AIRFLOW-1463:
------------------------------------------------

This is a scheduler / executor bug.

The crux of the problem is that the celery executor maintains internal state on 
what it has sent to be scheduled. For certain task instance states, as recorded 
in the metadata db, the scheduler will not attempt to reschedule until the 
state has changed. If the worker that picks up the task instance fails before 
it is able to change the task instance state in the metadata db, the task will 
get stuck in a "QUEUED" state.

As a work around, you can restart the scheduler to clear its internal state. In 
1.8.1, the -r option can be used to automatically restart at a regular interval.

In order to fix this issue, the celery executor will need to be modified to: a) 
regularly clear its internal state; b) synchronize its internal state with the 
state of the message queue; or c) use message queue state directly to determine 
what has been queued.

Your proposed fix may ameliorate the issue in your particular case but it 
introduces a number of race conditions around scheduler restarts that break 
some of the existing protections against running the same task multiple times.

It might clarify if you changed the title of this issue to something like, 
"Scheduler does not reschedule tasks in QUEUED state"

> Clear state of queued task when it fails due to DAG import error
> ----------------------------------------------------------------
>
>                 Key: AIRFLOW-1463
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1463
>             Project: Apache Airflow
>          Issue Type: Improvement
>          Components: cli
>         Environment: Ubuntu 14.04
> Airflow 1.8.0
> SQS backed task queue, AWS RDS backed meta storage
> DAG folder is synced by script on code push: archive is downloaded from s3, 
> unpacked, moved, install script is run. airflow executable is replaced with 
> symlink pointing to the latest version of code, no airflow processes are 
> restarted.
>            Reporter: Stanislav Pak
>            Assignee: Stanislav Pak
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Our pipelines related code is deployed almost simultaneously on all airflow 
> boxes: scheduler+webserver box, workers boxes. Some common python package is 
> deployed on those boxes on every other code push (3-5 deployments per hour). 
> Due to installation specifics, a DAG that imports module from that package 
> might fail. If DAG import fails when worker runs a task, the task is still 
> removed from the queue but task state is not changed, so in this case the 
> task stays in QUEUED state forever.
> Beside the described case, there is scenario when it happens because of DAG 
> update lag in scheduler. A task can be scheduled with old DAG and worker can 
> run the task with new DAG that fails to be imported.
> There might be other scenarios when it happens.
> Proposal:
> Catch errors when importing DAG on task run and clear task instance state if 
> import fails. This should fix transient issues of this kind.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to