Repository: incubator-airflow Updated Branches: refs/heads/master 23068924c -> 15ff540ec
[AIRFLOW-678] Prevent scheduler from double triggering TIs At the moment there is no lock/synchronization around the loop where the scheduler puts tasks in the SCHEDULED state. This means that if somehow the task starts running or gets SCHEDULED somewhere else somehow (e.g. manually running a task via the webserver) the task can have it's state changed from RUNNING/QUEUED to SCHEDULED which can cause a single task instance to be run twice at the same time. Testing Done: - Tested this branch on the Airbnb Airflow staging cluster - Airbnb has been running very similar logic in our production for many months (not 1-1 since we are still running off of the last release branch) - In the future we ideally need an integration test to catch double triggers but this is not trivial to do properly Closes #1924 from aoen/ddavydov/fix_scheduler_race_condition Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15ff540e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15ff540e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15ff540e Branch: refs/heads/master Commit: 15ff540ecd5e60e7ce080177ea3ea227582a4672 Parents: 2306892 Author: Dan Davydov <dan.davy...@airbnb.com> Authored: Mon Dec 12 12:10:15 2016 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Dec 12 12:10:38 2016 -0800 ---------------------------------------------------------------------- airflow/jobs.py | 24 ++++++++++++++++++++---- airflow/ti_deps/dep_context.py | 12 ++++-------- 2 files changed, 24 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15ff540e/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 22cdeb0..229424e 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -45,7 +45,7 @@ from airflow import configuration as conf from airflow.exceptions import AirflowException from airflow.models import DagRun from airflow.settings import Stats -from airflow.ti_deps.dep_context import RUN_DEPS, DepContext +from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS from airflow.utils.state import State from airflow.utils.db import provide_session, pessimistic_connection_handling from airflow.utils.dag_processing import (AbstractDagFileProcessor, @@ -1532,9 +1532,25 @@ class SchedulerJob(BaseJob): dag = dagbag.dags[ti_key[0]] task = dag.get_task(ti_key[1]) ti = models.TaskInstance(task, ti_key[2]) - # Task starts out in the scheduled state. All tasks in the - # scheduled state will be sent to the executor - ti.state = State.SCHEDULED + + ti.refresh_from_db(session=session, lock_for_update=True) + # We can defer checking the task dependency checks to the worker themselves + # since they can be expensive to run in the scheduler. + dep_context = DepContext(deps=QUEUE_DEPS, ignore_task_deps=True) + + # Only schedule tasks that have their dependencies met, e.g. to avoid + # a task that recently got it's state changed to RUNNING from somewhere + # other than the scheduler from getting it's state overwritten. + # TODO(aoen): It's not great that we have to check all the task instance + # dependencies twice; once to get the task scheduled, and again to actually + # run the task. We should try to come up with a way to only check them once. + if ti.are_dependencies_met( + dep_context=dep_context, + session=session, + verbose=True): + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED # Also save this task instance to the DB. self.logger.info("Creating / updating {} in ORM".format(ti)) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15ff540e/airflow/ti_deps/dep_context.py ---------------------------------------------------------------------- diff --git a/airflow/ti_deps/dep_context.py b/airflow/ti_deps/dep_context.py index 73ae924..583099d 100644 --- a/airflow/ti_deps/dep_context.py +++ b/airflow/ti_deps/dep_context.py @@ -81,17 +81,13 @@ QUEUEABLE_STATES = { State.UP_FOR_RETRY, } -# The minimum execution context for task instances to be executed. -MIN_EXEC_DEPS = { +# Context to get the dependencies that need to be met in order for a task instance to +# be backfilled. +QUEUE_DEPS = { NotRunningDep(), NotSkippedDep(), RunnableExecDateDep(), -} - -# Context to get the dependencies that need to be met in order for a task instance to -# be backfilled. -QUEUE_DEPS = MIN_EXEC_DEPS | { - ValidStateDep(QUEUEABLE_STATES) + ValidStateDep(QUEUEABLE_STATES), } # Dependencies that need to be met for a given task instance to be able to get run by an