[
https://issues.apache.org/jira/browse/AIRFLOW-585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Ash Berlin-Taylor resolved AIRFLOW-585.
---------------------------------------
Resolution: Done
> Fix race condition in backfill execution loop
> ---------------------------------------------
>
> Key: AIRFLOW-585
> URL: https://issues.apache.org/jira/browse/AIRFLOW-585
> Project: Apache Airflow
> Issue Type: Bug
> Components: executor, travis
> Environment: LocalExecutor, CeleryExecutor
> Reporter: Vijay Bhat
> Assignee: Vijay Bhat
> Priority: Major
>
> I found this bug while investigating random Travis build failures. The root
> cause is a subtle race condition in the backfill execution loop that creates
> deadlocks every now and then.
> *Analysis:*
> The common pattern I was seeing in the random build failures was a deadlock
> in an arbitrary backfill job. Example below:
> ========================================================
> ERROR: test_backfill_multi_dates (tests.BackfillJobTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
> File "/home/travis/build/apache/incubator-airflow/tests/jobs.py", line 100,
> in test_backfill_multi_dates
> job.run()
> File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line
> 194, in run
> self._execute()
> File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line
> 1894, in _execute
> raise AirflowException(err)
> nose.proxy.AirflowException:
> ---------------------------------------------------
> BackfillJob is deadlocked. These tasks were unable to run:
> {<TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00
> [None]>, <TaskInstance: example_bash_operator.run_this_last 2016-01-02
> 00:00:00 [None]>}
> After digging into the backfill execution code and adding lots of logging, I
> found a race condition vulnerability in the main backfill execution loop
> (BackfillJob._execute) for a DAG run:
> \# Triggering what is ready to get triggered
> while tasks_to_run and not deadlocked:
> not_ready.clear()
> for key, ti in list(tasks_to_run.items()):
> ti.refresh_from_db(session=session, lock_for_update=True)
> ...
> ...
> \# update dag run state
> run.update_state(session=session)
> if run.dag.is_paused:
> models.DagStat.clean_dirty([run.dag_id], session=session)
> The problem is that the state of all task instances for a DAG run is not read
> atomically in the loop, but can be refreshed piecemeal (highlighted in bold
> above) as the loop executes. In a multiprocessing scenario (like
> LocalExecutor), this leaves the door open to spuriously detecting a deadlock
> state and failing.
> Here's an example sequence of events that can cause this failure. Let's say
> we have a DAG with tasks A and B, with B dependent on A (A -> B) and A has
> been picked up by a worker (but not completed), which means B is not ready to
> run. The backfill / local executor process is actively running.
> 1. Let tasks_to_run be read as [B, A] in BackfillJob._execute
> 2. In the while loop, B is inspected first, and it's correctly identified as
> not runnable (since A hasn't succeeded yet). B is added to not_ready. Now,
> not_ready = [B]
> 3. The backfill / local executor process gets interrupted and control is
> given to the worker process, which then runs task A and marks it as complete
> in the DB (in the TaskInstance run method).
> 4. Control is given back to the backfill / local executor process that goes
> on to inspect task A. It calls ti.refresh_from_db, and finds A is complete,
> so it pops it off the tasks_to_run list. Now, tasks_to_run = [B]
> 5. The following code segment in the loop incorrectly marks the DAG run as
> deadlocked and the backfill job is marked failed:
> \# If the set of tasks that aren't ready ever equals the set
> of
> \# tasks to run, then the backfill is deadlocked
> if not_ready and not_ready == set(tasks_to_run):
> deadlocked.update(tasks_to_run.values())
> tasks_to_run.clear()
> *How to fix:*
> The main reason for the race condition is that we are not synchronizing
> access to the task instances in a DAG run.
> There are 3+n actors in the backfill system:
> * Backfill loop
> * LocalExecutor object
> * Metastore DB
> * n LocalWorker processes
> The backfill loop and local executor run in the same process, so we don't
> have to worry about synchronization between them. But we need to synchronize
> access between the other actors. The channels of communication in this
> context are:
> * Backfill loop <-> LocalExecutor = event_buffer
> * Backfill loop <-> Metastore = SQL Alchemy ORM
> * Metastore <-> LocalWorker = SQL Alchemy ORM
> Which means the backfill loop has two versions of the task instance state,
> one from the LocalExecutor event buffer (which gets updated when the worker
> completes a task) and another from the metastore (which the worker also
> writes to).
> If we consider the metastore to be the source of truth, we can synchronize
> access by reading the state of all task instances for the DAG run in a single
> query before the "for key, ti in list(tasks_to_run.items())" loop and
> removing individual task instance refreshes inside the loop.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)