Repository: incubator-airflow
Updated Branches:
  refs/heads/master 23068924c -> 15ff540ec


[AIRFLOW-678] Prevent scheduler from double triggering TIs

At the moment there is no lock/synchronization
around the loop where the scheduler puts tasks in
the SCHEDULED state. This means that if somehow
the task starts running or gets SCHEDULED
somewhere else somehow (e.g. manually running a
task via the webserver) the task can have it's
state changed from RUNNING/QUEUED to SCHEDULED
which can cause a single task instance to be run
twice at the same time.

Testing Done:
- Tested this branch on the Airbnb Airflow staging
cluster
- Airbnb has been running very similar logic in
our production for many months (not 1-1 since we
are still running off of the last release branch)
- In the future we ideally need an integration
test to catch double triggers but this is not
trivial to do properly

Closes #1924 from
aoen/ddavydov/fix_scheduler_race_condition


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15ff540e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15ff540e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15ff540e

Branch: refs/heads/master
Commit: 15ff540ecd5e60e7ce080177ea3ea227582a4672
Parents: 2306892
Author: Dan Davydov <dan.davy...@airbnb.com>
Authored: Mon Dec 12 12:10:15 2016 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Dec 12 12:10:38 2016 -0800

----------------------------------------------------------------------
 airflow/jobs.py                | 24 ++++++++++++++++++++----
 airflow/ti_deps/dep_context.py | 12 ++++--------
 2 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15ff540e/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 22cdeb0..229424e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -45,7 +45,7 @@ from airflow import configuration as conf
 from airflow.exceptions import AirflowException
 from airflow.models import DagRun
 from airflow.settings import Stats
-from airflow.ti_deps.dep_context import RUN_DEPS, DepContext
+from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
 from airflow.utils.state import State
 from airflow.utils.db import provide_session, pessimistic_connection_handling
 from airflow.utils.dag_processing import (AbstractDagFileProcessor,
@@ -1532,9 +1532,25 @@ class SchedulerJob(BaseJob):
             dag = dagbag.dags[ti_key[0]]
             task = dag.get_task(ti_key[1])
             ti = models.TaskInstance(task, ti_key[2])
-            # Task starts out in the scheduled state. All tasks in the
-            # scheduled state will be sent to the executor
-            ti.state = State.SCHEDULED
+
+            ti.refresh_from_db(session=session, lock_for_update=True)
+            # We can defer checking the task dependency checks to the worker 
themselves
+            # since they can be expensive to run in the scheduler.
+            dep_context = DepContext(deps=QUEUE_DEPS, ignore_task_deps=True)
+
+            # Only schedule tasks that have their dependencies met, e.g. to 
avoid
+            # a task that recently got it's state changed to RUNNING from 
somewhere
+            # other than the scheduler from getting it's state overwritten.
+            # TODO(aoen): It's not great that we have to check all the task 
instance
+            # dependencies twice; once to get the task scheduled, and again to 
actually
+            # run the task. We should try to come up with a way to only check 
them once.
+            if ti.are_dependencies_met(
+                    dep_context=dep_context,
+                    session=session,
+                    verbose=True):
+                # Task starts out in the scheduled state. All tasks in the
+                # scheduled state will be sent to the executor
+                ti.state = State.SCHEDULED
 
             # Also save this task instance to the DB.
             self.logger.info("Creating / updating {} in ORM".format(ti))

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15ff540e/airflow/ti_deps/dep_context.py
----------------------------------------------------------------------
diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py
index 73ae924..583099d 100644
--- a/airflow/ti_deps/dep_context.py
+++ b/airflow/ti_deps/dep_context.py
@@ -81,17 +81,13 @@ QUEUEABLE_STATES = {
     State.UP_FOR_RETRY,
 }
 
-# The minimum execution context for task instances to be executed.
-MIN_EXEC_DEPS = {
+# Context to get the dependencies that need to be met in order for a task 
instance to
+# be backfilled.
+QUEUE_DEPS = {
     NotRunningDep(),
     NotSkippedDep(),
     RunnableExecDateDep(),
-}
-
-# Context to get the dependencies that need to be met in order for a task 
instance to
-# be backfilled.
-QUEUE_DEPS = MIN_EXEC_DEPS | {
-    ValidStateDep(QUEUEABLE_STATES)
+    ValidStateDep(QUEUEABLE_STATES),
 }
 
 # Dependencies that need to be met for a given task instance to be able to get 
run by an

Reply via email to