> Postgres, Mysql >= 5.7) Curious--does this work in MySQL 5.6?
On Fri, May 6, 2016 at 1:36 PM, Bolke de Bruin <[email protected]> wrote: > Dear All, > > One of the remaining issues to resolve is Airflow-56: the fact that > airflow’s scheduler can “lose” queued tasks. One of the recent changes was > the hold queue status in the executor instead using the database. While > this goes against the “database knows the state”, it was done (afaik) to > prevent a race condition when a scheduler might actually set the task twice > for execution as the task might not have updated its state in the database > yet. However, if the executor loses its queue for some reason the tasks > that have a state of queued in the database will never get picked up. > Airflow-56 contains a DAG that will exhibit this behavior on clean install > of 1.7.1rc3 with mysql and local executor. > > Jeremiah has worked on > https://github.com/apache/incubator-airflow/pull/1378, that basically > reverts the behavior to using the database (pre 1.7.0), but contains some > additional fixes. In addition I have asked him to include the following: > > @provide_session > def refresh_from_db(self, session=None, lock_for_update=False): > """ > Refreshes the task instance from the database based on the primary key > """ > TI = TaskInstance > > if lock_for_update: > ti = session.query(TI).filter( > TI.dag_id == self.dag_id, > TI.task_id == self.task_id, > TI.execution_date == self.execution_date, > ).with_for_update().first() > else: > ti = session.query(TI).filter( > TI.dag_id == self.dag_id, > TI.task_id == self.task_id, > TI.execution_date == self.execution_date, > ).first() > > And to use “lock_for_update=True” in TaskInstance.run. This locks the > record for update (with a FOR UPDATE) in the database (Postgres, Mysql >= > 5.7) and ensures a “select” needs to wait if it includes this record in its > results. As soon as a “session.commit()” occurs the lock is gone. The > window of for the race condition to occur is this much shorter now (the > time between a new scheduler run and the time it takes for the python > interpreter to start “airflow run”) though not entirely closed. > > Due to the lock their *might* be a small performance hit, but it should be > very small: only queries that include the aforementioned locked record will > be delayed for a few milliseconds. However, we might overlook something so > I kindly request to review this PR, so we can make it part of 1.7.1. > > Thanks > Bolke > > >
