Add logic to lock DB and avoid race condition The scheduler can encounter a queued task twice before the task actually starts to run -- this locks the task and avoids that condition.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c1aa93f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c1aa93f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c1aa93f1 Branch: refs/heads/master Commit: c1aa93f1a7c9dbe88889b78b541b3abe05ded081 Parents: 43bdd7a Author: jlowin <[email protected]> Authored: Fri May 6 15:05:33 2016 -0400 Committer: jlowin <[email protected]> Committed: Mon May 9 17:19:02 2016 -0400 ---------------------------------------------------------------------- airflow/models.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c1aa93f1/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index a1b17ac..7549325 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -817,16 +817,25 @@ class TaskInstance(Base): session.commit() @provide_session - def refresh_from_db(self, session=None): + def refresh_from_db(self, session=None, lock_for_update=False): """ Refreshes the task instance from the database based on the primary key + + :param lock_for_update: if True, indicates that the database should + lock the TaskInstance (issuing a FOR UPDATE clause) until the session + is committed. """ TI = TaskInstance - ti = session.query(TI).filter( + + qry = session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, - TI.execution_date == self.execution_date, - ).first() + TI.execution_date == self.execution_date) + + if lock_for_update: + ti = qry.with_for_update().first() + else: + ti = qry.first() if ti: self.state = ti.state self.start_date = ti.start_date @@ -1159,7 +1168,7 @@ class TaskInstance(Base): self.pool = pool or task.pool self.test_mode = test_mode self.force = force - self.refresh_from_db() + self.refresh_from_db(session=session, lock_for_update=True) self.clear_xcom_data() self.job_id = job_id iso = datetime.now().isoformat()
