Good, because that's the version we use :) On Fri, May 6, 2016 at 1:55 PM, Bolke de Bruin <[email protected]> wrote:
> It seems it does: https://dev.mysql.com/doc/refman/5.6/en/select.html > > Bolke > > Sent from my iPhone > > On 6 mei 2016, at 22:52, Chris Riccomini <[email protected]> wrote: > > >> Postgres, Mysql >= 5.7) > > > > Curious--does this work in MySQL 5.6? > > > >> On Fri, May 6, 2016 at 1:36 PM, Bolke de Bruin <[email protected]> > wrote: > >> > >> Dear All, > >> > >> One of the remaining issues to resolve is Airflow-56: the fact that > >> airflow’s scheduler can “lose” queued tasks. One of the recent changes > was > >> the hold queue status in the executor instead using the database. While > >> this goes against the “database knows the state”, it was done (afaik) to > >> prevent a race condition when a scheduler might actually set the task > twice > >> for execution as the task might not have updated its state in the > database > >> yet. However, if the executor loses its queue for some reason the tasks > >> that have a state of queued in the database will never get picked up. > >> Airflow-56 contains a DAG that will exhibit this behavior on clean > install > >> of 1.7.1rc3 with mysql and local executor. > >> > >> Jeremiah has worked on > >> https://github.com/apache/incubator-airflow/pull/1378, that basically > >> reverts the behavior to using the database (pre 1.7.0), but contains > some > >> additional fixes. In addition I have asked him to include the following: > >> > >> @provide_session > >> def refresh_from_db(self, session=None, lock_for_update=False): > >> """ > >> Refreshes the task instance from the database based on the primary > key > >> """ > >> TI = TaskInstance > >> > >> if lock_for_update: > >> ti = session.query(TI).filter( > >> TI.dag_id == self.dag_id, > >> TI.task_id == self.task_id, > >> TI.execution_date == self.execution_date, > >> ).with_for_update().first() > >> else: > >> ti = session.query(TI).filter( > >> TI.dag_id == self.dag_id, > >> TI.task_id == self.task_id, > >> TI.execution_date == self.execution_date, > >> ).first() > >> > >> And to use “lock_for_update=True” in TaskInstance.run. This locks the > >> record for update (with a FOR UPDATE) in the database (Postgres, Mysql > >= > >> 5.7) and ensures a “select” needs to wait if it includes this record in > its > >> results. As soon as a “session.commit()” occurs the lock is gone. The > >> window of for the race condition to occur is this much shorter now (the > >> time between a new scheduler run and the time it takes for the python > >> interpreter to start “airflow run”) though not entirely closed. > >> > >> Due to the lock their *might* be a small performance hit, but it should > be > >> very small: only queries that include the aforementioned locked record > will > >> be delayed for a few milliseconds. However, we might overlook something > so > >> I kindly request to review this PR, so we can make it part of 1.7.1. > >> > >> Thanks > >> Bolke > >> > >> > >> >
