Repository: incubator-airflow Updated Branches: refs/heads/master 9c67ee842 -> e9f3fdc52
[AIRFLOW-2566] Change backfill to rerun failed tasks Closes #3464 from feng-tao/airflow-2566 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e9f3fdc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e9f3fdc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e9f3fdc5 Branch: refs/heads/master Commit: e9f3fdc52cb53f3ac3e9721e5128d17d1c5c418c Parents: 9c67ee8 Author: Tao feng <[email protected]> Authored: Fri Jun 8 08:28:22 2018 -0700 Committer: Maxime Beauchemin <[email protected]> Committed: Fri Jun 8 08:28:22 2018 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 27 +++++++++-- airflow/jobs.py | 83 +++++++++++++++++++++++---------- airflow/models.py | 4 +- airflow/settings.py | 2 +- tests/jobs.py | 116 +++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 202 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 2742df5..b56e325 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -214,6 +214,7 @@ def backfill(args, dag=None): delay_on_limit_secs=args.delay_on_limit, verbose=args.verbose, conf=run_conf, + rerun_failed_tasks=args.rerun_failed_tasks, ) @@ -1375,10 +1376,19 @@ class CLIFactory(object): default=1.0), 'reset_dag_run': Arg( ("--reset_dagruns",), - ("if set, the backfill will delete existing " - "backfill-related DAG runs and start " - "anew with fresh, running DAG runs"), + ( + "if set, the backfill will delete existing " + "backfill-related DAG runs and start " + "anew with fresh, running DAG runs"), + "store_true"), + 'rerun_failed_tasks': Arg( + ("--rerun_failed_tasks",), + ( + "if set, the backfill will auto-rerun " + "all the failed tasks for the backfill date range " + "instead of throwing exceptions"), "store_true"), + # list_tasks 'tree': Arg(("-t", "--tree"), "Tree view", "store_true"), # list_dags @@ -1693,13 +1703,20 @@ class CLIFactory(object): subparsers = ( { 'func': backfill, - 'help': "Run subsections of a DAG for a specified date range", + 'help': "Run subsections of a DAG for a specified date range. " + "If reset_dag_run option is used," + " backfill will first prompt users whether airflow " + "should clear all the previous dag_run and task_instances " + "within the backfill date range." + "If rerun_failed_tasks is used, backfill " + "will auto re-run the previous failed task instances" + " within the backfill date range.", 'args': ( 'dag_id', 'task_regex', 'start_date', 'end_date', 'mark_success', 'local', 'donot_pickle', 'bf_ignore_dependencies', 'bf_ignore_first_depends_on_past', 'subdir', 'pool', 'delay_on_limit', 'dry_run', 'verbose', 'conf', - 'reset_dag_run' + 'reset_dag_run', 'rerun_failed_tasks', ) }, { 'func': list_tasks, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 827349a..ad114ab 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1957,7 +1957,36 @@ class BackfillJob(BaseJob): delay_on_limit_secs=1.0, verbose=False, conf=None, + rerun_failed_tasks=False, *args, **kwargs): + """ + :param dag: DAG object. + :type dag: `class DAG`. + :param start_date: start date for the backfill date range. + :type start_date: datetime. + :param end_date: end date for the backfill date range. + :type end_date: datetime + :param mark_success: flag whether to mark the task auto success. + :type mark_success: bool + :param donot_pickle: whether pickle + :type donot_pickle: bool + :param ignore_first_depends_on_past: whether to ignore depend on past + :type ignore_first_depends_on_past: bool + :param ignore_task_deps: whether to ignore the task dependency + :type ignore_task_deps: bool + :param pool: + :type pool: list + :param delay_on_limit_secs: + :param verbose: + :type verbose: flag to whether display verbose message to backfill console + :param conf: a dictionary which user could pass k-v pairs for backfill + :type conf: dictionary + :param rerun_failed_tasks: flag to whether to + auto rerun the failed task in backfill + :type rerun_failed_tasks: bool + :param args: + :param kwargs: + """ self.dag = dag self.dag_id = dag.dag_id self.bf_start_date = start_date @@ -1970,6 +1999,7 @@ class BackfillJob(BaseJob): self.delay_on_limit_secs = delay_on_limit_secs self.verbose = verbose self.conf = conf + self.rerun_failed_tasks = rerun_failed_tasks super(BackfillJob, self).__init__(*args, **kwargs) def _update_counters(self, ti_status): @@ -2216,15 +2246,6 @@ class BackfillJob(BaseJob): self.log.debug( "Task instance to run %s state %s", ti, ti.state) - # guard against externally modified tasks instances or - # in case max concurrency has been reached at task runtime - if ti.state == State.NONE: - self.log.warning( - "FIXME: task instance {} state was set to None " - "externally. This should not happen" - ) - ti.set_state(State.SCHEDULED, session=session) - # The task was already marked successful or skipped by a # different Job. Don't rerun it. if ti.state == State.SUCCESS: @@ -2241,20 +2262,36 @@ class BackfillJob(BaseJob): if key in ti_status.running: ti_status.running.pop(key) continue - elif ti.state == State.FAILED: - self.log.error("Task instance %s failed", ti) - ti_status.failed.add(key) - ti_status.to_run.pop(key) - if key in ti_status.running: - ti_status.running.pop(key) - continue - elif ti.state == State.UPSTREAM_FAILED: - self.log.error("Task instance %s upstream failed", ti) - ti_status.failed.add(key) - ti_status.to_run.pop(key) - if key in ti_status.running: - ti_status.running.pop(key) - continue + + # guard against externally modified tasks instances or + # in case max concurrency has been reached at task runtime + elif ti.state == State.NONE: + self.log.warning( + "FIXME: task instance {} state was set to None " + "externally. This should not happen" + ) + ti.set_state(State.SCHEDULED, session=session) + if self.rerun_failed_tasks: + # Rerun failed tasks or upstreamed failed tasks + if ti.state in (State.FAILED, State.UPSTREAM_FAILED): + self.log.error("Task instance {ti} " + "with state {state}".format(ti=ti, + state=ti.state)) + if key in ti_status.running: + ti_status.running.pop(key) + # Reset the failed task in backfill to scheduled state + ti.set_state(State.SCHEDULED, session=session) + else: + # Default behaviour which works for subdag. + if ti.state in (State.FAILED, State.UPSTREAM_FAILED): + self.log.error("Task instance {ti} " + "with {state} state".format(ti=ti, + state=ti.state)) + ti_status.failed.add(key) + ti_status.to_run.pop(key) + if key in ti_status.running: + ti_status.running.pop(key) + continue backfill_context = DepContext( deps=RUN_DEPS, http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index c26ec01..312074c 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4010,6 +4010,7 @@ class DAG(BaseDag, LoggingMixin): delay_on_limit_secs=1.0, verbose=False, conf=None, + rerun_failed_tasks=False, ): """ Runs the DAG. @@ -4059,6 +4060,7 @@ class DAG(BaseDag, LoggingMixin): delay_on_limit_secs=delay_on_limit_secs, verbose=verbose, conf=conf, + rerun_failed_tasks=rerun_failed_tasks, ) job.run() @@ -4950,7 +4952,7 @@ class DagRun(Base, LoggingMixin): tis = self.get_task_instances(session=session) - self.log.info("Updating state for %s considering %s task(s)", self, len(tis)) + self.log.debug("Updating state for %s considering %s task(s)", self, len(tis)) for ti in list(tis): # skip in db? http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 2abe568..7c0376d 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -151,7 +151,7 @@ def configure_orm(disable_connection_pool=False): pool_connections = conf.getboolean('core', 'SQL_ALCHEMY_POOL_ENABLED') if disable_connection_pool or not pool_connections: engine_args['poolclass'] = NullPool - log.info("settings.configure_orm(): Using NullPool") + log.debug("settings.configure_orm(): Using NullPool") elif 'sqlite' not in SQL_ALCHEMY_CONN: # Engine args not supported by sqlite. # If no config value is defined for the pool size, select a reasonable value. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e9f3fdc5/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index 8e7f056..f534b65 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -214,6 +214,122 @@ class BackfillJobTest(unittest.TestCase): self.assertEqual(conf, dr[0].conf) + def test_backfill_rerun_failed_tasks(self): + dag = DAG( + dag_id='test_backfill_rerun_failed', + start_date=DEFAULT_DATE, + schedule_interval='@daily') + + with dag: + DummyOperator( + task_id='test_backfill_rerun_failed_task-1', + dag=dag) + + dag.clear() + + executor = TestExecutor(do_update=True) + + job = BackfillJob(dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + ) + job.run() + + ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'), + execution_date=DEFAULT_DATE) + ti.refresh_from_db() + ti.set_state(State.FAILED) + + job = BackfillJob(dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + rerun_failed_tasks=True + ) + job.run() + ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'), + execution_date=DEFAULT_DATE) + ti.refresh_from_db() + self.assertEquals(ti.state, State.SUCCESS) + + def test_backfill_rerun_upstream_failed_tasks(self): + dag = DAG( + dag_id='test_backfill_rerun_upstream_failed', + start_date=DEFAULT_DATE, + schedule_interval='@daily') + + with dag: + t1 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-1', + dag=dag) + t2 = DummyOperator(task_id='test_backfill_rerun_upstream_failed_task-2', + dag=dag) + t1.set_upstream(t2) + + dag.clear() + executor = TestExecutor(do_update=True) + + job = BackfillJob(dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + ) + job.run() + + ti = TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'), + execution_date=DEFAULT_DATE) + ti.refresh_from_db() + ti.set_state(State.UPSTREAM_FAILED) + + job = BackfillJob(dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + rerun_failed_tasks=True + ) + job.run() + ti = TI(task=dag.get_task('test_backfill_rerun_upstream_failed_task-1'), + execution_date=DEFAULT_DATE) + ti.refresh_from_db() + self.assertEquals(ti.state, State.SUCCESS) + + def test_backfill_rerun_failed_tasks_without_flag(self): + dag = DAG( + dag_id='test_backfill_rerun_failed', + start_date=DEFAULT_DATE, + schedule_interval='@daily') + + with dag: + DummyOperator( + task_id='test_backfill_rerun_failed_task-1', + dag=dag) + + dag.clear() + + executor = TestExecutor(do_update=True) + + job = BackfillJob(dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + ) + job.run() + + ti = TI(task=dag.get_task('test_backfill_rerun_failed_task-1'), + execution_date=DEFAULT_DATE) + ti.refresh_from_db() + ti.set_state(State.FAILED) + + job = BackfillJob(dag=dag, + executor=executor, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE + datetime.timedelta(days=2), + rerun_failed_tasks=False + ) + + with self.assertRaises(AirflowException): + job.run() + def test_backfill_ordered_concurrent_execute(self): dag = DAG( dag_id='test_backfill_ordered_concurrent_execute',
