[ https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
George Roldugin updated AIRFLOW-2145: ------------------------------------- Priority: Minor (was: Major) > Deadlock after clearing a running task > -------------------------------------- > > Key: AIRFLOW-2145 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2145 > Project: Apache Airflow > Issue Type: Bug > Affects Versions: 1.9.0 > Reporter: George Roldugin > Priority: Minor > Attachments: image-2018-02-23-18-59-11-828.png, > image-2018-02-23-19-00-37-741.png, image-2018-02-23-19-00-55-630.png, > image-2018-02-23-19-01-45-012.png, image-2018-02-23-19-01-57-498.png, > image-2018-02-23-19-02-18-837.png > > > TL;DR The essense of the issue is that whenever a currently running ask is > cleared, the dagrun enters a deadlocked state and fails. > > We see this in production with Celery executors and {{TimeDeltaSensor}}, and > I've been able to reproduce it locally with both {{TimeDeltaSensor}} and > {{WebHDFSSensor}}. > Here's the minimal example: > {code:java} > from datetime import datetime, timedelta > import airflow > from airflow.operators.sensors import TimeDeltaSensor > from airflow.operators.dummy_operator import DummyOperator > with airflow.DAG( > 'foo', > schedule_interval='@daily', > start_date=datetime(2018, 1, 1)) as dag: > wait_for_upstream_sla = TimeDeltaSensor( > task_id="wait_for_upstream_sla", > delta=timedelta(days=365*10) > ) > do_work = DummyOperator(task_id='do_work') > dag >> wait_for_upstream_sla >> do_work > {code} > > Sequence of actions, relevant DEBUG level logs, and some UI screenshots > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s > 2018-02-22 -e 2018-02-22{code} > {code:java} > [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor > SequentialExecutor > [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from > /Users/grol/Drive/dev/reporting/dags > ... > [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run > <TaskInstance: foo.wait_for_upstream_sla 2018-02-22 00:00:00 [scheduled]> > state scheduled > ... > {code} > !image-2018-02-23-18-59-11-828.png|width=418,height=87! > Now we clear all DAG's tasks externally: > {code:java} > airflow clear foo -e 2018-02-22 --no_confirm > {code} > This causes the following: > {code:java} > [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask: > [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time > (2018-02-23 16:19:00) has come > [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has > been externally set to shutdown. Taking the poison pill. > [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom. > [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant > processes to kill > [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to > execute task Command 'airflow run foo wait_for_upstream_sla > 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit > status 1. > [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task > <TaskInstance: foo.wait_for_upstream_sla 2018-02-22 00:00:00 [shutdown]> > [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for <DagRun > foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally > triggered: False> considering 2 task(s) > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [None]> dependency 'Previous Dagrun State' PASSED: True, > The task did not have depends_on_past set. > [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [None]> dependency 'Not In Retry Period' PASSED: True, > The context specified that being in a retry period was permitted. > [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's > trigger rule 'all_success' requires all upstream tasks to have succeeded, but > found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0, > 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, > upstream_task_ids=['wait_for_upstream_sla'] > [2018-02-23 17:18:09,029] {models.py:4643} INFO - Deadlock; marking run > <DagRun foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally > triggered: False> failed > [2018-02-23 17:18:09,045] {jobs.py:2125} INFO - [backfill progress] | > finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | kicked_off: 1 | > failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1 > [2018-02-23 17:18:09,045] {jobs.py:2129} DEBUG - Finished dag run loop > iteration. Remaining tasks [<TaskInstance: foo.do_work 2018-02-22 00:00:00 > [scheduled]>] > [2018-02-23 17:18:09,045] {jobs.py:2160} DEBUG - *** Clearing out not_ready > list *** > [2018-02-23 17:18:09,048] {jobs.py:2180} DEBUG - Task instance to run > <TaskInstance: foo.do_work 2018-02-22 00:00:00 [None]> state None > [2018-02-23 17:18:09,049] {jobs.py:2186} WARNING - FIXME: task instance {} > state was set to None externally. This should not happen > [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [scheduled]> dependency 'Task Instance State' PASSED: > True, Task state scheduled was valid. > [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [scheduled]> dependency 'Previous Dagrun State' PASSED: > True, The task did not have depends_on_past set. > [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [scheduled]> dependency 'Task Concurrency' PASSED: True, > Task concurrency is not set. > [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [scheduled]> dependency 'Not In Retry Period' PASSED: > True, The task instance was not marked for retrying. > [2018-02-23 17:18:09,061] {models.py:1215} DEBUG - <TaskInstance: foo.do_work > 2018-02-22 00:00:00 [scheduled]> dependency 'Trigger Rule' PASSED: False, > Task's trigger rule 'all_success' requires all upstream tasks to have > succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': 0, > 'successes': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1}, > upstream_task_ids=['wait_for_upstream_sla'] > [2018-02-23 17:18:09,061] {models.py:1190} INFO - Dependencies not met for > <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]>, dependency > 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all > upstream tasks to have succeeded, but found 1 non-success(es). > upstream_tasks_state={'skipped': 0, 'successes': 0, 'failed': 0, > 'upstream_failed': 0, 'done': 0, 'total': 1}, > upstream_task_ids=['wait_for_upstream_sla'] > [2018-02-23 17:18:09,061] {jobs.py:2274} DEBUG - Adding <TaskInstance: > foo.do_work 2018-02-22 00:00:00 [scheduled]> to not_ready > [2018-02-23 17:18:09,067] {jobs.py:184} DEBUG - [heart] Boom. > {code} > !image-2018-02-23-19-00-37-741.png|width=375,height=78! > !image-2018-02-23-19-01-57-498.png|width=374,height=77! > Interestingly, once the success condition of the {{TimeDeltaSensor}} is met, > in production we see the following final state in the UI: DAG failed, while > the {{TimeDeltaSensor}} task succeeded, though there's no evidence of success > in the celery executors logs. > !image-2018-02-23-19-02-18-837.png|width=563,height=87! -- This message was sent by Atlassian JIRA (v7.6.3#76005)