Dear All,

One of the remaining issues to resolve is Airflow-56: the fact that airflow’s 
scheduler can “lose” queued tasks. One of the recent changes was the hold queue 
status in the executor instead using the database. While this goes against the 
“database knows the state”, it was done (afaik) to prevent a race condition 
when a scheduler might actually set the task twice for execution as the task 
might not have updated its state in the database yet. However, if the executor 
loses its queue for some reason the tasks that have a state of queued in the 
database will never get picked up. Airflow-56 contains a DAG that will exhibit 
this behavior on clean install of 1.7.1rc3 with mysql and local executor.

Jeremiah has worked on https://github.com/apache/incubator-airflow/pull/1378, 
that basically reverts the behavior to using the database (pre 1.7.0), but 
contains some additional fixes. In addition I have asked him to include the 
following:

@provide_session
def refresh_from_db(self, session=None, lock_for_update=False):
    """
    Refreshes the task instance from the database based on the primary key
    """
    TI = TaskInstance

    if lock_for_update:
        ti = session.query(TI).filter(
            TI.dag_id == self.dag_id,
            TI.task_id == self.task_id,
            TI.execution_date == self.execution_date,
        ).with_for_update().first()
    else:
        ti = session.query(TI).filter(
            TI.dag_id == self.dag_id,
            TI.task_id == self.task_id,
            TI.execution_date == self.execution_date,
        ).first()

And to use “lock_for_update=True” in TaskInstance.run. This locks the record 
for update (with a FOR UPDATE) in the database (Postgres, Mysql >= 5.7) and 
ensures a “select” needs to wait if it includes this record in its results. As 
soon as a “session.commit()” occurs the lock is gone. The window of for the 
race condition to occur is this much shorter now (the time between a new 
scheduler run and the time it takes for the python interpreter to start 
“airflow run”) though not entirely closed. 

Due to the lock their *might* be a small performance hit, but it should be very 
small: only queries that include the aforementioned locked record will be 
delayed for a few milliseconds. However, we might overlook something so I 
kindly request to review this PR, so we can make it part of 1.7.1.

Thanks
Bolke


Reply via email to