Dear All, One of the remaining issues to resolve is Airflow-56: the fact that airflow’s scheduler can “lose” queued tasks. One of the recent changes was the hold queue status in the executor instead using the database. While this goes against the “database knows the state”, it was done (afaik) to prevent a race condition when a scheduler might actually set the task twice for execution as the task might not have updated its state in the database yet. However, if the executor loses its queue for some reason the tasks that have a state of queued in the database will never get picked up. Airflow-56 contains a DAG that will exhibit this behavior on clean install of 1.7.1rc3 with mysql and local executor.
Jeremiah has worked on https://github.com/apache/incubator-airflow/pull/1378, that basically reverts the behavior to using the database (pre 1.7.0), but contains some additional fixes. In addition I have asked him to include the following: @provide_session def refresh_from_db(self, session=None, lock_for_update=False): """ Refreshes the task instance from the database based on the primary key """ TI = TaskInstance if lock_for_update: ti = session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, TI.execution_date == self.execution_date, ).with_for_update().first() else: ti = session.query(TI).filter( TI.dag_id == self.dag_id, TI.task_id == self.task_id, TI.execution_date == self.execution_date, ).first() And to use “lock_for_update=True” in TaskInstance.run. This locks the record for update (with a FOR UPDATE) in the database (Postgres, Mysql >= 5.7) and ensures a “select” needs to wait if it includes this record in its results. As soon as a “session.commit()” occurs the lock is gone. The window of for the race condition to occur is this much shorter now (the time between a new scheduler run and the time it takes for the python interpreter to start “airflow run”) though not entirely closed. Due to the lock their *might* be a small performance hit, but it should be very small: only queries that include the aforementioned locked record will be delayed for a few milliseconds. However, we might overlook something so I kindly request to review this PR, so we can make it part of 1.7.1. Thanks Bolke
