Same :) On Fri, May 6, 2016 at 1:57 PM, Arthur Wiedmer <[email protected]> wrote:
> Good, because that's the version we use :) > > On Fri, May 6, 2016 at 1:55 PM, Bolke de Bruin <[email protected]> wrote: > > > It seems it does: https://dev.mysql.com/doc/refman/5.6/en/select.html > > > > Bolke > > > > Sent from my iPhone > > > > On 6 mei 2016, at 22:52, Chris Riccomini <[email protected]> wrote: > > > > >> Postgres, Mysql >= 5.7) > > > > > > Curious--does this work in MySQL 5.6? > > > > > >> On Fri, May 6, 2016 at 1:36 PM, Bolke de Bruin <[email protected]> > > wrote: > > >> > > >> Dear All, > > >> > > >> One of the remaining issues to resolve is Airflow-56: the fact that > > >> airflow’s scheduler can “lose” queued tasks. One of the recent changes > > was > > >> the hold queue status in the executor instead using the database. > While > > >> this goes against the “database knows the state”, it was done (afaik) > to > > >> prevent a race condition when a scheduler might actually set the task > > twice > > >> for execution as the task might not have updated its state in the > > database > > >> yet. However, if the executor loses its queue for some reason the > tasks > > >> that have a state of queued in the database will never get picked up. > > >> Airflow-56 contains a DAG that will exhibit this behavior on clean > > install > > >> of 1.7.1rc3 with mysql and local executor. > > >> > > >> Jeremiah has worked on > > >> https://github.com/apache/incubator-airflow/pull/1378, that basically > > >> reverts the behavior to using the database (pre 1.7.0), but contains > > some > > >> additional fixes. In addition I have asked him to include the > following: > > >> > > >> @provide_session > > >> def refresh_from_db(self, session=None, lock_for_update=False): > > >> """ > > >> Refreshes the task instance from the database based on the primary > > key > > >> """ > > >> TI = TaskInstance > > >> > > >> if lock_for_update: > > >> ti = session.query(TI).filter( > > >> TI.dag_id == self.dag_id, > > >> TI.task_id == self.task_id, > > >> TI.execution_date == self.execution_date, > > >> ).with_for_update().first() > > >> else: > > >> ti = session.query(TI).filter( > > >> TI.dag_id == self.dag_id, > > >> TI.task_id == self.task_id, > > >> TI.execution_date == self.execution_date, > > >> ).first() > > >> > > >> And to use “lock_for_update=True” in TaskInstance.run. This locks the > > >> record for update (with a FOR UPDATE) in the database (Postgres, Mysql > > >= > > >> 5.7) and ensures a “select” needs to wait if it includes this record > in > > its > > >> results. As soon as a “session.commit()” occurs the lock is gone. The > > >> window of for the race condition to occur is this much shorter now > (the > > >> time between a new scheduler run and the time it takes for the python > > >> interpreter to start “airflow run”) though not entirely closed. > > >> > > >> Due to the lock their *might* be a small performance hit, but it > should > > be > > >> very small: only queries that include the aforementioned locked record > > will > > >> be delayed for a few milliseconds. However, we might overlook > something > > so > > >> I kindly request to review this PR, so we can make it part of 1.7.1. > > >> > > >> Thanks > > >> Bolke > > >> > > >> > > >> > > >
