[ 
https://issues.apache.org/jira/browse/AIRFLOW-585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ash Berlin-Taylor resolved AIRFLOW-585.
---------------------------------------
    Resolution: Done

> Fix race condition in backfill execution loop
> ---------------------------------------------
>
>                 Key: AIRFLOW-585
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-585
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: executor, travis
>         Environment: LocalExecutor, CeleryExecutor
>            Reporter: Vijay Bhat
>            Assignee: Vijay Bhat
>            Priority: Major
>
> I found this bug while investigating random Travis build failures. The root 
> cause is a subtle race condition in the backfill execution loop that creates 
> deadlocks every now and then.
> *Analysis:*
> The common pattern I was seeing in the random build failures was a deadlock 
> in an arbitrary backfill job. Example below:
> ========================================================
> ERROR: test_backfill_multi_dates (tests.BackfillJobTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/home/travis/build/apache/incubator-airflow/tests/jobs.py", line 100, 
> in test_backfill_multi_dates
>     job.run()
>   File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line 
> 194, in run
>     self._execute()
>   File "/home/travis/build/apache/incubator-airflow/airflow/jobs.py", line 
> 1894, in _execute
>     raise AirflowException(err)
> nose.proxy.AirflowException: 
> ---------------------------------------------------
> BackfillJob is deadlocked. These tasks were unable to run:
> {<TaskInstance: example_bash_operator.run_after_loop 2016-01-02 00:00:00 
> [None]>, <TaskInstance: example_bash_operator.run_this_last 2016-01-02 
> 00:00:00 [None]>}
> After digging into the backfill execution code and adding lots of logging, I 
> found a race condition vulnerability in the main backfill execution loop 
> (BackfillJob._execute) for a DAG run:
>            \# Triggering what is ready to get triggered
>             while tasks_to_run and not deadlocked:
>                 not_ready.clear()
>                 for key, ti in list(tasks_to_run.items()):
>                     ti.refresh_from_db(session=session, lock_for_update=True)
>              ...
>              ...
>             \# update dag run state
>             run.update_state(session=session)
>             if run.dag.is_paused:
>                 models.DagStat.clean_dirty([run.dag_id], session=session)
> The problem is that the state of all task instances for a DAG run is not read 
> atomically in the loop, but can be refreshed piecemeal (highlighted in bold 
> above) as the loop executes. In a multiprocessing scenario (like 
> LocalExecutor), this leaves the door open to spuriously detecting a deadlock 
> state and failing.
> Here's an example sequence of events that can cause this failure. Let's say 
> we have a DAG with tasks A and B, with B dependent on A (A -> B) and A has 
> been picked up by a worker (but not completed), which means B is not ready to 
> run. The backfill / local executor process is actively running.
> 1. Let tasks_to_run be read as [B, A] in BackfillJob._execute
> 2. In the while loop, B is inspected first, and it's correctly identified as 
> not runnable (since A hasn't succeeded yet). B is added to not_ready. Now, 
> not_ready = [B]
> 3. The backfill / local executor process gets interrupted and control is 
> given to the worker process, which then runs task A and marks it as complete 
> in the DB (in the TaskInstance run method).
> 4. Control is given back to the backfill / local executor process that goes 
> on to inspect task A. It calls ti.refresh_from_db, and finds A is complete, 
> so it pops it off the tasks_to_run list. Now, tasks_to_run = [B]
> 5. The following code segment in the loop incorrectly marks the DAG run as 
> deadlocked and the backfill job is marked failed: 
>                 \# If the set of tasks that aren't ready ever equals the set 
> of
>                 \# tasks to run, then the backfill is deadlocked
>                 if not_ready and not_ready == set(tasks_to_run):
>                     deadlocked.update(tasks_to_run.values())
>                     tasks_to_run.clear()
> *How to fix:*
> The main reason for the race condition is that we are not synchronizing 
> access to the task instances in a DAG run. 
> There are 3+n actors in the backfill system:
> * Backfill loop
> * LocalExecutor object
> * Metastore DB
> * n LocalWorker processes
> The backfill loop and local executor run in the same process, so we don't 
> have to worry about synchronization between them. But we need to synchronize 
> access between the other actors. The channels of communication in this 
> context are:
> * Backfill loop <-> LocalExecutor = event_buffer
> * Backfill loop <-> Metastore = SQL Alchemy ORM
> * Metastore <-> LocalWorker = SQL Alchemy ORM
> Which means the backfill loop has two versions of the task instance state, 
> one from the LocalExecutor event buffer (which gets updated when the worker 
> completes a task) and another from the metastore (which the worker also 
> writes to).
> If we consider the metastore to be the source of truth, we can synchronize 
> access by reading the state of all task instances for the DAG run in a single 
> query before the "for key, ti in list(tasks_to_run.items())" loop and 
> removing individual task instance refreshes inside the loop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to