Repository: incubator-airflow Updated Branches: refs/heads/master dd2bc8cb9 -> c3c4a8fdc
[AIRFLOW-511][Airflow 511] add success/failure callbacks on dag level Closes #2934 from Acehaidrey/AIRFLOW-511 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c3c4a8fd Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c3c4a8fd Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c3c4a8fd Branch: refs/heads/master Commit: c3c4a8fdce0574d42be44f070d43807238daa9e8 Parents: dd2bc8c Author: Ace Haidrey <[email protected]> Authored: Fri Jan 19 18:53:27 2018 +0100 Committer: Bolke de Bruin <[email protected]> Committed: Fri Jan 19 18:53:27 2018 +0100 ---------------------------------------------------------------------- airflow/jobs.py | 2 ++ airflow/models.py | 43 ++++++++++++++++++++++++++++++++++ tests/models.py | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index ae6969e..1b5e661 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -772,6 +772,8 @@ class SchedulerJob(BaseJob): dr.start_date < timezone.utcnow() - dag.dagrun_timeout): dr.state = State.FAILED dr.end_date = timezone.utcnow() + dag.handle_callback(dr, success=False, reason='dagrun_timeout', + session=session) timedout_runs += 1 session.commit() if len(active_runs) - timedout_runs >= dag.max_active_runs: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index c5233ec..5de18b2 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2961,6 +2961,12 @@ class DAG(BaseDag, LoggingMixin): :type orientation: string :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True :type catchup: bool + :param on_failure_callback: A function to be called when a DagRun of this dag fails. + A context dictionary is passed as a single parameter to this function. + :type on_failure_callback: callable + :param on_success_callback: Much like the ``on_failure_callback`` except + that it is executed when the dag succeeds. + :type on_success_callback: callable """ def __init__( @@ -2981,6 +2987,7 @@ class DAG(BaseDag, LoggingMixin): default_view=configuration.get('webserver', 'dag_default_view').lower(), orientation=configuration.get('webserver', 'dag_orientation'), catchup=configuration.getboolean('scheduler', 'catchup_by_default'), + on_success_callback=None, on_failure_callback=None, params=None): self.user_defined_macros = user_defined_macros @@ -3053,6 +3060,8 @@ class DAG(BaseDag, LoggingMixin): self.is_subdag = False # DagBag.bag_dag() will set this to True if appropriate self.partial = False + self.on_success_callback = on_success_callback + self.on_failure_callback = on_failure_callback self._comps = { 'dag_id', @@ -3315,6 +3324,35 @@ class DAG(BaseDag, LoggingMixin): return qry.value('is_paused') @provide_session + def handle_callback(self, dagrun, success=True, reason=None, session=None): + """ + Triggers the appropriate callback depending on the value of success, namely the + on_failure_callback or on_success_callback. This method gets the context of a + single TaskInstance part of this DagRun and passes that to the callable along + with a 'reason', primarily to differentiate DagRun failures. + .. note:: + The logs end up in $AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log + :param dagrun: DagRun object + :param success: Flag to specify if failure or success callback should be called + :param reason: Completion reason + :param session: Database session + """ + callback = self.on_success_callback if success else self.on_failure_callback + if callback: + self.log.info('Executing dag callback function: {}'.format(callback)) + tis = dagrun.get_task_instances(session=session) + ti = tis[-1] # get first TaskInstance of DagRun + # certain task instance attributes are transient so must save them + # -- especially during timeouts theyre lost + if not hasattr(ti, 'task'): + d = dagrun.dag or DagBag().get_dag(dag_id=dagrun.dag_id) + task = d.get_task(ti.task_id) + ti.task = task + context = ti.get_template_context(session=session) + context.update({'reason': reason}) + callback(context) + + @provide_session def get_active_runs(self, session=None): """ Returns a list of dag run execution dates currently running @@ -4762,18 +4800,23 @@ class DagRun(Base, LoggingMixin): any(r.state in (State.FAILED, State.UPSTREAM_FAILED) for r in roots)): self.log.info('Marking run %s failed', self) self.state = State.FAILED + dag.handle_callback(self, success=False, reason='task_failure', + session=session) # if all roots succeeded and no unfinished tasks, the run succeeded elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) for r in roots): self.log.info('Marking run %s successful', self) self.state = State.SUCCESS + dag.handle_callback(self, success=True, reason='success', session=session) # if *all tasks* are deadlocked, the run failed elif (unfinished_tasks and none_depends_on_past and none_task_concurrency and no_dependencies_met): self.log.info('Deadlock; marking run %s failed', self) self.state = State.FAILED + dag.handle_callback(self, success=False, reason='all_tasks_deadlocked', + session=session) # finally, if the roots aren't done, the dag is still running else: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c3c4a8fd/tests/models.py ---------------------------------------------------------------------- diff --git a/tests/models.py b/tests/models.py index f0879eb..11bf7c9 100644 --- a/tests/models.py +++ b/tests/models.py @@ -645,6 +645,68 @@ class DagRunTest(unittest.TestCase): self.assertEqual(dr.state, State.RUNNING) self.assertEqual(dr2.state, State.RUNNING) + def test_dagrun_success_callback(self): + def on_success_callable(context): + self.assertEqual( + context['dag_run'].dag_id, + 'test_dagrun_success_callback' + ) + + dag = DAG( + dag_id='test_dagrun_success_callback', + start_date=datetime.datetime(2017, 1, 1), + on_success_callback=on_success_callable, + ) + dag_task1 = DummyOperator( + task_id='test_state_succeeded1', + dag=dag) + dag_task2 = DummyOperator( + task_id='test_state_succeeded2', + dag=dag) + dag_task1.set_downstream(dag_task2) + + initial_task_states = { + 'test_state_succeeded1': State.SUCCESS, + 'test_state_succeeded2': State.SUCCESS, + } + + dag_run = self.create_dag_run(dag=dag, + state=State.RUNNING, + task_states=initial_task_states) + updated_dag_state = dag_run.update_state() + self.assertEqual(State.SUCCESS, updated_dag_state) + + def test_dagrun_failure_callback(self): + def on_failure_callable(context): + self.assertEqual( + context['dag_run'].dag_id, + 'test_dagrun_failure_callback' + ) + + dag = DAG( + dag_id='test_dagrun_failure_callback', + start_date=datetime.datetime(2017, 1, 1), + on_failure_callback=on_failure_callable, + ) + dag_task1 = DummyOperator( + task_id='test_state_succeeded1', + dag=dag) + dag_task2 = DummyOperator( + task_id='test_state_failed2', + dag=dag) + + initial_task_states = { + 'test_state_succeeded1': State.SUCCESS, + 'test_state_failed2': State.FAILED, + } + dag_task1.set_downstream(dag_task2) + + dag_run = self.create_dag_run(dag=dag, + state=State.RUNNING, + task_states=initial_task_states) + updated_dag_state = dag_run.update_state() + self.assertEqual(State.FAILED, updated_dag_state) + def test_get_task_instance_on_empty_dagrun(self): """ Make sure that a proper value is returned when a dagrun has no task instances
