Repository: incubator-airflow Updated Branches: refs/heads/master 92064398c -> 635c97a60
[AIRFLOW-316] Always check DB state for Backfill Job execution Closes #1654 from aoen/ddavydov/dont_skip_db_state_check_for_subdag Always check DB state and not just the local state for backfill jobs for determining which task instances have not yet completed execution. This is to avoid potential race conditions with e.g. two backfill jobs running the same task instance. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/635c97a6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/635c97a6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/635c97a6 Branch: refs/heads/master Commit: 635c97a60636c223d374452258465fdc337f0913 Parents: 9206439 Author: Dan Davydov <[email protected]> Authored: Wed Jul 6 14:44:21 2016 -0700 Committer: Dan Davydov <[email protected]> Committed: Wed Jul 6 14:44:28 2016 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/635c97a6/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index b3270b0..a0749c0 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -896,17 +896,16 @@ class BackfillJob(BaseJob): ti.execution_date == (start_date or ti.start_date)) # The task was already marked successful or skipped by a # different Job. Don't rerun it. - if key not in started: - if ti.state == State.SUCCESS: - succeeded.add(key) - tasks_to_run.pop(key) - session.commit() - continue - elif ti.state == State.SKIPPED: - skipped.add(key) - tasks_to_run.pop(key) - session.commit() - continue + if ti.state == State.SUCCESS: + succeeded.add(key) + tasks_to_run.pop(key) + session.commit() + continue + elif ti.state == State.SKIPPED: + skipped.add(key) + tasks_to_run.pop(key) + session.commit() + continue # Is the task runnable? -- then run it if ti.is_queueable(
