Good, because that's the version we use :)

On Fri, May 6, 2016 at 1:55 PM, Bolke de Bruin <[email protected]> wrote:

> It seems it does: https://dev.mysql.com/doc/refman/5.6/en/select.html
>
> Bolke
>
> Sent from my iPhone
>
> On 6 mei 2016, at 22:52, Chris Riccomini <[email protected]> wrote:
>
> >> Postgres, Mysql >= 5.7)
> >
> > Curious--does this work in MySQL 5.6?
> >
> >> On Fri, May 6, 2016 at 1:36 PM, Bolke de Bruin <[email protected]>
> wrote:
> >>
> >> Dear All,
> >>
> >> One of the remaining issues to resolve is Airflow-56: the fact that
> >> airflow’s scheduler can “lose” queued tasks. One of the recent changes
> was
> >> the hold queue status in the executor instead using the database. While
> >> this goes against the “database knows the state”, it was done (afaik) to
> >> prevent a race condition when a scheduler might actually set the task
> twice
> >> for execution as the task might not have updated its state in the
> database
> >> yet. However, if the executor loses its queue for some reason the tasks
> >> that have a state of queued in the database will never get picked up.
> >> Airflow-56 contains a DAG that will exhibit this behavior on clean
> install
> >> of 1.7.1rc3 with mysql and local executor.
> >>
> >> Jeremiah has worked on
> >> https://github.com/apache/incubator-airflow/pull/1378, that basically
> >> reverts the behavior to using the database (pre 1.7.0), but contains
> some
> >> additional fixes. In addition I have asked him to include the following:
> >>
> >> @provide_session
> >> def refresh_from_db(self, session=None, lock_for_update=False):
> >>    """
> >>    Refreshes the task instance from the database based on the primary
> key
> >>    """
> >>    TI = TaskInstance
> >>
> >>    if lock_for_update:
> >>        ti = session.query(TI).filter(
> >>            TI.dag_id == self.dag_id,
> >>            TI.task_id == self.task_id,
> >>            TI.execution_date == self.execution_date,
> >>        ).with_for_update().first()
> >>    else:
> >>        ti = session.query(TI).filter(
> >>            TI.dag_id == self.dag_id,
> >>            TI.task_id == self.task_id,
> >>            TI.execution_date == self.execution_date,
> >>        ).first()
> >>
> >> And to use “lock_for_update=True” in TaskInstance.run. This locks the
> >> record for update (with a FOR UPDATE) in the database (Postgres, Mysql
> >=
> >> 5.7) and ensures a “select” needs to wait if it includes this record in
> its
> >> results. As soon as a “session.commit()” occurs the lock is gone. The
> >> window of for the race condition to occur is this much shorter now (the
> >> time between a new scheduler run and the time it takes for the python
> >> interpreter to start “airflow run”) though not entirely closed.
> >>
> >> Due to the lock their *might* be a small performance hit, but it should
> be
> >> very small: only queries that include the aforementioned locked record
> will
> >> be delayed for a few milliseconds. However, we might overlook something
> so
> >> I kindly request to review this PR, so we can make it part of 1.7.1.
> >>
> >> Thanks
> >> Bolke
> >>
> >>
> >>
>

Reply via email to