Add logic to lock DB and avoid race condition The scheduler can encounter a queued task twice before the task actually starts to run -- this locks the task and avoids that condition.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/77c7bc4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/77c7bc4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/77c7bc4a Branch: refs/heads/airbnb_rb1.7.1_3 Commit: 77c7bc4ac11513b63fd82d9dd9b2a98e13ff06e0 Parents: edc718b Author: jlowin <[email protected]> Authored: Fri May 6 15:05:33 2016 -0400 Committer: Dan Davydov <[email protected]> Committed: Mon May 9 16:26:56 2016 -0700 ---------------------------------------------------------------------- airflow/models.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/77c7bc4a/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index c3a01b0..58d6fd8 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -775,16 +775,25 @@ class TaskInstance(Base): session.commit() @provide_session - def refresh_from_db(self, session=None): + def refresh_from_db(self, session=None, lock_for_update=False): """ Refreshes the task instance from the database based on the primary key + + :param lock_for_update: if True, indicates that the database should + lock the TaskInstance (issuing a FOR UPDATE clause) until the session + is committed. """ TI = TaskInstance - ti = session.query(TI).filter( + + qry = session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, - TI.execution_date == self.execution_date, - ).first() + TI.execution_date == self.execution_date) + + if lock_for_update: + ti = qry.with_for_update().first() + else: + ti = qry.first() if ti: self.state = ti.state self.start_date = ti.start_date @@ -1117,7 +1126,7 @@ class TaskInstance(Base): self.pool = pool or task.pool self.test_mode = test_mode self.force = force - self.refresh_from_db() + self.refresh_from_db(session=session, lock_for_update=True) self.clear_xcom_data() self.job_id = job_id iso = datetime.now().isoformat()
