Repository: incubator-airflow
Updated Branches:
  refs/heads/master 9c67ee842 -> e9f3fdc52


[AIRFLOW-2566] Change backfill to rerun failed tasks

Closes #3464 from feng-tao/airflow-2566


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e9f3fdc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e9f3fdc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e9f3fdc5

Branch: refs/heads/master
Commit: e9f3fdc52cb53f3ac3e9721e5128d17d1c5c418c
Parents: 9c67ee8
Author: Tao feng <[email protected]>
Authored: Fri Jun 8 08:28:22 2018 -0700
Committer: Maxime Beauchemin <[email protected]>
Committed: Fri Jun 8 08:28:22 2018 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py  |  27 +++++++++--
 airflow/jobs.py     |  83 +++++++++++++++++++++++----------
 airflow/models.py   |   4 +-
 airflow/settings.py |   2 +-
 tests/jobs.py       | 116 +++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 202 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 2742df5..b56e325 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -214,6 +214,7 @@ def backfill(args, dag=None):
             delay_on_limit_secs=args.delay_on_limit,
             verbose=args.verbose,
             conf=run_conf,
+            rerun_failed_tasks=args.rerun_failed_tasks,
         )
 
 
@@ -1375,10 +1376,19 @@ class CLIFactory(object):
             default=1.0),
         'reset_dag_run': Arg(
             ("--reset_dagruns",),
-            ("if set, the backfill will delete existing "
-             "backfill-related DAG runs and start "
-             "anew with fresh, running DAG runs"),
+            (
+                "if set, the backfill will delete existing "
+                "backfill-related DAG runs and start "
+                "anew with fresh, running DAG runs"),
+            "store_true"),
+        'rerun_failed_tasks': Arg(
+            ("--rerun_failed_tasks",),
+            (
+                "if set, the backfill will auto-rerun "
+                "all the failed tasks for the backfill date range "
+                "instead of throwing exceptions"),
             "store_true"),
+
         # list_tasks
         'tree': Arg(("-t", "--tree"), "Tree view", "store_true"),
         # list_dags
@@ -1693,13 +1703,20 @@ class CLIFactory(object):
     subparsers = (
         {
             'func': backfill,
-            'help': "Run subsections of a DAG for a specified date range",
+            'help': "Run subsections of a DAG for a specified date range. "
+                    "If reset_dag_run option is used,"
+                    " backfill will first prompt users whether airflow "
+                    "should clear all the previous dag_run and task_instances "
+                    "within the backfill date range."
+                    "If rerun_failed_tasks is used, backfill "
+                    "will auto re-run the previous failed task instances"
+                    " within the backfill date range.",
             'args': (
                 'dag_id', 'task_regex', 'start_date', 'end_date',
                 'mark_success', 'local', 'donot_pickle',
                 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past',
                 'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 
'conf',
-                'reset_dag_run'
+                'reset_dag_run', 'rerun_failed_tasks',
             )
         }, {
             'func': list_tasks,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 827349a..ad114ab 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1957,7 +1957,36 @@ class BackfillJob(BaseJob):
             delay_on_limit_secs=1.0,
             verbose=False,
             conf=None,
+            rerun_failed_tasks=False,
             *args, **kwargs):
+        """
+        :param dag: DAG object.
+        :type dag: `class DAG`.
+        :param start_date: start date for the backfill date range.
+        :type start_date: datetime.
+        :param end_date: end date for the backfill date range.
+        :type end_date: datetime
+        :param mark_success: flag whether to mark the task auto success.
+        :type mark_success: bool
+        :param donot_pickle: whether pickle
+        :type donot_pickle: bool
+        :param ignore_first_depends_on_past: whether to ignore depend on past
+        :type ignore_first_depends_on_past: bool
+        :param ignore_task_deps: whether to ignore the task dependency
+        :type ignore_task_deps: bool
+        :param pool:
+        :type pool: list
+        :param delay_on_limit_secs:
+        :param verbose:
+        :type verbose: flag to whether display verbose message to backfill 
console
+        :param conf: a dictionary which user could pass k-v pairs for backfill
+        :type conf: dictionary
+        :param rerun_failed_tasks: flag to whether to
+                                   auto rerun the failed task in backfill
+        :type rerun_failed_tasks: bool
+        :param args:
+        :param kwargs:
+        """
         self.dag = dag
         self.dag_id = dag.dag_id
         self.bf_start_date = start_date
@@ -1970,6 +1999,7 @@ class BackfillJob(BaseJob):
         self.delay_on_limit_secs = delay_on_limit_secs
         self.verbose = verbose
         self.conf = conf
+        self.rerun_failed_tasks = rerun_failed_tasks
         super(BackfillJob, self).__init__(*args, **kwargs)
 
     def _update_counters(self, ti_status):
@@ -2216,15 +2246,6 @@ class BackfillJob(BaseJob):
                     self.log.debug(
                         "Task instance to run %s state %s", ti, ti.state)
 
-                    # guard against externally modified tasks instances or
-                    # in case max concurrency has been reached at task runtime
-                    if ti.state == State.NONE:
-                        self.log.warning(
-                            "FIXME: task instance {} state was set to None "
-                            "externally. This should not happen"
-                        )
-                        ti.set_state(State.SCHEDULED, session=session)
-
                     # The task was already marked successful or skipped by a
                     # different Job. Don't rerun it.
                     if ti.state == State.SUCCESS:
@@ -2241,20 +2262,36 @@ class BackfillJob(BaseJob):
                         if key in ti_status.running:
                             ti_status.running.pop(key)
                         continue
-                    elif ti.state == State.FAILED:
-                        self.log.error("Task instance %s failed", ti)
-                        ti_status.failed.add(key)
-                        ti_status.to_run.pop(key)
-                        if key in ti_status.running:
-                            ti_status.running.pop(key)
-                        continue
-                    elif ti.state == State.UPSTREAM_FAILED:
-                        self.log.error("Task instance %s upstream failed", ti)
-                        ti_status.failed.add(key)
-                        ti_status.to_run.pop(key)
-                        if key in ti_status.running:
-                            ti_status.running.pop(key)
-                        continue
+
+                    # guard against externally modified tasks instances or
+                    # in case max concurrency has been reached at task runtime
+                    elif ti.state == State.NONE:
+                        self.log.warning(
+                            "FIXME: task instance {} state was set to None "
+                            "externally. This should not happen"
+                        )
+                        ti.set_state(State.SCHEDULED, session=session)
+                    if self.rerun_failed_tasks:
+                        # Rerun failed tasks or upstreamed failed tasks
+                        if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
+                            self.log.error("Task instance {ti} "
+                                           "with state {state}".format(ti=ti,
+                                                                       
state=ti.state))
+                            if key in ti_status.running:
+                                ti_status.running.pop(key)
+                            # Reset the failed task in backfill to scheduled 
state
+                            ti.set_state(State.SCHEDULED, session=session)
+                    else:
+                        # Default behaviour which works for subdag.
+                        if ti.state in (State.FAILED, State.UPSTREAM_FAILED):
+                            self.log.error("Task instance {ti} "
+                                           "with {state} state".format(ti=ti,
+                                                                       
state=ti.state))
+                            ti_status.failed.add(key)
+                            ti_status.to_run.pop(key)
+                            if key in ti_status.running:
+                                ti_status.running.pop(key)
+                            continue
 
                     backfill_context = DepContext(
                         deps=RUN_DEPS,

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index c26ec01..312074c 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4010,6 +4010,7 @@ class DAG(BaseDag, LoggingMixin):
             delay_on_limit_secs=1.0,
             verbose=False,
             conf=None,
+            rerun_failed_tasks=False,
     ):
         """
         Runs the DAG.
@@ -4059,6 +4060,7 @@ class DAG(BaseDag, LoggingMixin):
             delay_on_limit_secs=delay_on_limit_secs,
             verbose=verbose,
             conf=conf,
+            rerun_failed_tasks=rerun_failed_tasks,
         )
         job.run()
 
@@ -4950,7 +4952,7 @@ class DagRun(Base, LoggingMixin):
 
         tis = self.get_task_instances(session=session)
 
-        self.log.info("Updating state for %s considering %s task(s)", self, 
len(tis))
+        self.log.debug("Updating state for %s considering %s task(s)", self, 
len(tis))
 
         for ti in list(tis):
             # skip in db?

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 2abe568..7c0376d 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -151,7 +151,7 @@ def configure_orm(disable_connection_pool=False):
     pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED')
     if disable_connection_pool or not pool_connections:
         engine_args['poolclass'] = NullPool
-        log.info("settings.configure_orm(): Using NullPool")
+        log.debug("settings.configure_orm(): Using NullPool")
     elif 'sqlite' not in SQL_ALCHEMY_CONN:
         # Engine args not supported by sqlite.
         # If no config value is defined for the pool size, select a reasonable 
value.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index 8e7f056..f534b65 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -214,6 +214,122 @@ class BackfillJobTest(unittest.TestCase):
 
         self.assertEqual(conf, dr[0].conf)
 
+    def test_backfill_rerun_failed_tasks(self):
+        dag = DAG(
+            dag_id='test_backfill_rerun_failed',
+            start_date=DEFAULT_DATE,
+            schedule_interval='@daily')
+
+        with dag:
+            DummyOperator(
+                task_id='test_backfill_rerun_failed_task-1',
+                dag=dag)
+
+        dag.clear()
+
+        executor = TestExecutor(do_update=True)
+
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          )
+        job.run()
+
+        ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'),
+                execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        ti.set_state(State.FAILED)
+
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          rerun_failed_tasks=True
+                          )
+        job.run()
+        ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'),
+                execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        self.assertEquals(ti.state, State.SUCCESS)
+
+    def test_backfill_rerun_upstream_failed_tasks(self):
+            dag = DAG(
+                dag_id='test_backfill_rerun_upstream_failed',
+                start_date=DEFAULT_DATE,
+                schedule_interval='@daily')
+
+            with dag:
+                t1 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1',
+                                   dag=dag)
+                t2 = 
DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2',
+                                   dag=dag)
+                t1.set_upstream(t2)
+
+            dag.clear()
+            executor = TestExecutor(do_update=True)
+
+            job = BackfillJob(dag=dag,
+                              executor=executor,
+                              start_date=DEFAULT_DATE,
+                              end_date=DEFAULT_DATE + 
datetime.timedelta(days=2),
+                              )
+            job.run()
+
+            ti = 
TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'),
+                    execution_date=DEFAULT_DATE)
+            ti.refresh_from_db()
+            ti.set_state(State.UPSTREAM_FAILED)
+
+            job = BackfillJob(dag=dag,
+                              executor=executor,
+                              start_date=DEFAULT_DATE,
+                              end_date=DEFAULT_DATE + 
datetime.timedelta(days=2),
+                              rerun_failed_tasks=True
+                              )
+            job.run()
+            ti = 
TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'),
+                    execution_date=DEFAULT_DATE)
+            ti.refresh_from_db()
+            self.assertEquals(ti.state, State.SUCCESS)
+
+    def test_backfill_rerun_failed_tasks_without_flag(self):
+        dag = DAG(
+            dag_id='test_backfill_rerun_failed',
+            start_date=DEFAULT_DATE,
+            schedule_interval='@daily')
+
+        with dag:
+            DummyOperator(
+                task_id='test_backfill_rerun_failed_task-1',
+                dag=dag)
+
+        dag.clear()
+
+        executor = TestExecutor(do_update=True)
+
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          )
+        job.run()
+
+        ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'),
+                execution_date=DEFAULT_DATE)
+        ti.refresh_from_db()
+        ti.set_state(State.FAILED)
+
+        job = BackfillJob(dag=dag,
+                          executor=executor,
+                          start_date=DEFAULT_DATE,
+                          end_date=DEFAULT_DATE + datetime.timedelta(days=2),
+                          rerun_failed_tasks=False
+                          )
+
+        with self.assertRaises(AirflowException):
+            job.run()
+
     def test_backfill_ordered_concurrent_execute(self):
         dag = DAG(
             dag_id='test_backfill_ordered_concurrent_execute',

Reply via email to