[
https://issues.apache.org/jira/browse/AIRFLOW-2145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16599411#comment-16599411
]
ASF GitHub Bot commented on AIRFLOW-2145:
-----------------------------------------
kaxil closed pull request #3657: [AIRFLOW-2145] fix deadlock on clearing
running task instance
URL: https://github.com/apache/incubator-airflow/pull/3657
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/utils/state.py b/airflow/utils/state.py
index 9da98510eb..a351df07b9 100644
--- a/airflow/utils/state.py
+++ b/airflow/utils/state.py
@@ -7,9 +7,9 @@
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
-#
+#
# http://www.apache.org/licenses/LICENSE-2.0
-#
+#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -101,7 +101,6 @@ def finished(cls):
"""
return [
cls.SUCCESS,
- cls.SHUTDOWN,
cls.FAILED,
cls.SKIPPED,
]
@@ -117,5 +116,6 @@ def unfinished(cls):
cls.SCHEDULED,
cls.QUEUED,
cls.RUNNING,
+ cls.SHUTDOWN,
cls.UP_FOR_RETRY
]
diff --git a/tests/models.py b/tests/models.py
index 1c88ea47f7..529ae56454 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -801,7 +801,26 @@ def test_dagrun_deadlock(self):
dr.update_state()
self.assertEqual(dr.state, State.FAILED)
- def test_dagrun_no_deadlock(self):
+ def test_dagrun_no_deadlock_with_shutdown(self):
+ session = settings.Session()
+ dag = DAG('test_dagrun_no_deadlock_with_shutdown',
+ start_date=DEFAULT_DATE)
+ with dag:
+ op1 = DummyOperator(task_id='upstream_task')
+ op2 = DummyOperator(task_id='downstream_task')
+ op2.set_upstream(op1)
+
+ dr = dag.create_dagrun(run_id='test_dagrun_no_deadlock_with_shutdown',
+ state=State.RUNNING,
+ execution_date=DEFAULT_DATE,
+ start_date=DEFAULT_DATE)
+ upstream_ti = dr.get_task_instance(task_id='upstream_task')
+ upstream_ti.set_state(State.SHUTDOWN, session=session)
+
+ dr.update_state()
+ self.assertEqual(dr.state, State.RUNNING)
+
+ def test_dagrun_no_deadlock_with_depends_on_past(self):
session = settings.Session()
dag = DAG('test_dagrun_no_deadlock',
start_date=DEFAULT_DATE)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Deadlock after clearing a running task
> --------------------------------------
>
> Key: AIRFLOW-2145
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2145
> Project: Apache Airflow
> Issue Type: Bug
> Affects Versions: 1.9.0
> Reporter: George Roldugin
> Priority: Minor
> Fix For: 1.10.1
>
> Attachments: image-2018-02-23-18-59-11-828.png,
> image-2018-02-23-19-00-37-741.png, image-2018-02-23-19-00-55-630.png,
> image-2018-02-23-19-01-45-012.png, image-2018-02-23-19-01-57-498.png,
> image-2018-02-23-19-02-18-837.png
>
>
> TL;DR The essense of the issue is that whenever a currently running ask is
> cleared, the dagrun enters a deadlocked state and fails.
>
> We see this in production with Celery executors and {{TimeDeltaSensor}}, and
> I've been able to reproduce it locally with both {{TimeDeltaSensor}} and
> {{WebHDFSSensor}}.
> Here's the minimal example:
> {code:java}
> from datetime import datetime, timedelta
> import airflow
> from airflow.operators.sensors import TimeDeltaSensor
> from airflow.operators.dummy_operator import DummyOperator
> with airflow.DAG(
> 'foo',
> schedule_interval='@daily',
> start_date=datetime(2018, 1, 1)) as dag:
> wait_for_upstream_sla = TimeDeltaSensor(
> task_id="wait_for_upstream_sla",
> delta=timedelta(days=365*10)
> )
> do_work = DummyOperator(task_id='do_work')
> dag >> wait_for_upstream_sla >> do_work
> {code}
>
> Sequence of actions, relevant DEBUG level logs, and some UI screenshots
> {code:java}
> airflow clear foo -e 2018-02-22 --no_confirm && airflow backfill foo -s
> 2018-02-22 -e 2018-02-22{code}
> {code:java}
> [2018-02-23 17:17:45,983] {__init__.py:45} INFO - Using executor
> SequentialExecutor
> [2018-02-23 17:17:46,069] {models.py:189} INFO - Filling up the DagBag from
> /Users/grol/Drive/dev/reporting/dags
> ...
> [2018-02-23 17:17:47,563] {jobs.py:2180} DEBUG - Task instance to run
> <TaskInstance: foo.wait_for_upstream_sla 2018-02-22 00:00:00 [scheduled]>
> state scheduled
> ...
> {code}
> !image-2018-02-23-18-59-11-828.png|width=418,height=87!
> Now we clear all DAG's tasks externally:
> {code:java}
> airflow clear foo -e 2018-02-22 --no_confirm
> {code}
> This causes the following:
> {code:java}
> [2018-02-23 17:17:55,258] {base_task_runner.py:98} INFO - Subtask:
> [2018-02-23 17:17:55,258] {sensors.py:629} INFO - Checking if the time
> (2018-02-23 16:19:00) has come
> [2018-02-23 17:17:58,844] {jobs.py:184} DEBUG - [heart] Boom.
> [2018-02-23 17:18:03,848] {jobs.py:184} DEBUG - [heart] Boom.
> [2018-02-23 17:18:08,856] {jobs.py:2585} WARNING - State of this instance has
> been externally set to shutdown. Taking the poison pill.
> [2018-02-23 17:18:08,874] {helpers.py:266} DEBUG - There are no descendant
> processes to kill
> [2018-02-23 17:18:08,875] {jobs.py:184} DEBUG - [heart] Boom.
> [2018-02-23 17:18:08,900] {helpers.py:266} DEBUG - There are no descendant
> processes to kill
> [2018-02-23 17:18:08,922] {helpers.py:266} DEBUG - There are no descendant
> processes to kill
> [2018-02-23 17:18:09,005] {sequential_executor.py:47} ERROR - Failed to
> execute task Command 'airflow run foo wait_for_upstream_sla
> 2018-02-22T00:00:00 --local -sd DAGS_FOLDER/foo.py' returned non-zero exit
> status 1.
> [2018-02-23 17:18:09,012] {jobs.py:2004} DEBUG - Executor state: failed task
> <TaskInstance: foo.wait_for_upstream_sla 2018-02-22 00:00:00 [shutdown]>
> [2018-02-23 17:18:09,018] {models.py:4584} INFO - Updating state for <DagRun
> foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally
> triggered: False> considering 2 task(s)
> [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [None]> dependency 'Previous Dagrun State' PASSED: True,
> The task did not have depends_on_past set.
> [2018-02-23 17:18:09,021] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [None]> dependency 'Not In Retry Period' PASSED: True,
> The context specified that being in a retry period was permitted.
> [2018-02-23 17:18:09,027] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [None]> dependency 'Trigger Rule' PASSED: False, Task's
> trigger rule 'all_success' requires all upstream tasks to have succeeded, but
> found 1 non-success(es). upstream_tasks_state={'skipped': 0, 'successes': 0,
> 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1},
> upstream_task_ids=['wait_for_upstream_sla']
> [2018-02-23 17:18:09,029] {models.py:4643} INFO - Deadlock; marking run
> <DagRun foo @ 2018-02-22 00:00:00: backfill_2018-02-22T00:00:00, externally
> triggered: False> failed
> [2018-02-23 17:18:09,045] {jobs.py:2125} INFO - [backfill progress] |
> finished run 1 of 1 | tasks waiting: 1 | succeeded: 0 | kicked_off: 1 |
> failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
> [2018-02-23 17:18:09,045] {jobs.py:2129} DEBUG - Finished dag run loop
> iteration. Remaining tasks [<TaskInstance: foo.do_work 2018-02-22 00:00:00
> [scheduled]>]
> [2018-02-23 17:18:09,045] {jobs.py:2160} DEBUG - *** Clearing out not_ready
> list ***
> [2018-02-23 17:18:09,048] {jobs.py:2180} DEBUG - Task instance to run
> <TaskInstance: foo.do_work 2018-02-22 00:00:00 [None]> state None
> [2018-02-23 17:18:09,049] {jobs.py:2186} WARNING - FIXME: task instance {}
> state was set to None externally. This should not happen
> [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [scheduled]> dependency 'Task Instance State' PASSED:
> True, Task state scheduled was valid.
> [2018-02-23 17:18:09,053] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [scheduled]> dependency 'Previous Dagrun State' PASSED:
> True, The task did not have depends_on_past set.
> [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [scheduled]> dependency 'Task Concurrency' PASSED: True,
> Task concurrency is not set.
> [2018-02-23 17:18:09,056] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [scheduled]> dependency 'Not In Retry Period' PASSED:
> True, The task instance was not marked for retrying.
> [2018-02-23 17:18:09,061] {models.py:1215} DEBUG - <TaskInstance: foo.do_work
> 2018-02-22 00:00:00 [scheduled]> dependency 'Trigger Rule' PASSED: False,
> Task's trigger rule 'all_success' requires all upstream tasks to have
> succeeded, but found 1 non-success(es). upstream_tasks_state={'skipped': 0,
> 'successes': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0, 'total': 1},
> upstream_task_ids=['wait_for_upstream_sla']
> [2018-02-23 17:18:09,061] {models.py:1190} INFO - Dependencies not met for
> <TaskInstance: foo.do_work 2018-02-22 00:00:00 [scheduled]>, dependency
> 'Trigger Rule' FAILED: Task's trigger rule 'all_success' requires all
> upstream tasks to have succeeded, but found 1 non-success(es).
> upstream_tasks_state={'skipped': 0, 'successes': 0, 'failed': 0,
> 'upstream_failed': 0, 'done': 0, 'total': 1},
> upstream_task_ids=['wait_for_upstream_sla']
> [2018-02-23 17:18:09,061] {jobs.py:2274} DEBUG - Adding <TaskInstance:
> foo.do_work 2018-02-22 00:00:00 [scheduled]> to not_ready
> [2018-02-23 17:18:09,067] {jobs.py:184} DEBUG - [heart] Boom.
> {code}
> !image-2018-02-23-19-00-37-741.png|width=375,height=78!
> !image-2018-02-23-19-01-57-498.png|width=374,height=77!
> Interestingly, once the success condition of the {{TimeDeltaSensor}} is met,
> in production we see the following final state in the UI: DAG failed, while
> the {{TimeDeltaSensor}} task succeeded, though there's no evidence of success
> in the celery executors logs.
> !image-2018-02-23-19-02-18-837.png|width=563,height=87!
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)