ashb commented on a change in pull request #6954: [AIRFLOW-4355] removed task
should not lead to dagrun success
URL: https://github.com/apache/airflow/pull/6954#discussion_r365209436
##########
File path: airflow/models/dagrun.py
##########
@@ -311,32 +313,51 @@ def update_state(self, session=None):
leaf_tis = [ti for ti in tis if ti.task_id in {t.task_id for t in
dag.leaves}]
- # if all roots finished and at least one failed, the run failed
- if not unfinished_tasks and any(
- leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for leaf_ti
in leaf_tis
- ):
- self.log.info('Marking run %s failed', self)
- self.set_state(State.FAILED)
- dag.handle_callback(self, success=False, reason='task_failure',
- session=session)
-
- # if all leafs succeeded and no unfinished tasks, the run succeeded
- elif not unfinished_tasks and all(
- leaf_ti.state in {State.SUCCESS, State.SKIPPED} for leaf_ti in
leaf_tis
- ):
- self.log.info('Marking run %s successful', self)
- self.set_state(State.SUCCESS)
- dag.handle_callback(self, success=True, reason='success',
session=session)
-
- # if *all tasks* are deadlocked, the run failed
- elif (unfinished_tasks and none_depends_on_past and
- none_task_concurrency and no_dependencies_met):
- self.log.info('Deadlock; marking run %s failed', self)
- self.set_state(State.FAILED)
- dag.handle_callback(self, success=False,
reason='all_tasks_deadlocked',
- session=session)
-
- # finally, if the roots aren't done, the dag is still running
+ if len(tis) == len(dag.tasks):
+ # if all roots finished and at least one failed, the run failed
+ if not unfinished_tasks and any(
+ leaf_ti.state in {State.FAILED, State.UPSTREAM_FAILED} for
leaf_ti in leaf_tis
+ ):
+ self.log.info('Marking run %s failed', self)
+ self.set_state(State.FAILED)
+ dag.handle_callback(self, success=False, reason='task_failure',
+ session=session)
+
+ # if all leafs succeeded and no unfinished tasks, the run succeeded
+ elif not unfinished_tasks and all(
+ leaf_ti.state in {State.SUCCESS, State.SKIPPED} for leaf_ti in
leaf_tis
+ ):
+ # removed tasks count as FAILURE
+ if conf.getboolean('scheduler',
'REMOVED_TASKS_LEAD_TO_DAGRUN_FAILURE', fallback=False):
+ removed_tasks = self.get_task_instances(
+ state=State.REMOVED,
+ session=session
+ )
+ if removed_tasks and len(removed_tasks) > 0:
+ self.log.info('Removed_tasks; Marking run %s failed',
self)
+ self.set_state(State.FAILED)
+ dag.handle_callback(self, success=False,
reason='removed_tasks',
+ session=session)
+ else:
+ self.log.info('Marking run %s successful', self)
Review comment:
I don't like that we've got duplicated else blocks here -- it makes it
harder to follow the logic here.
If we change the conditonal to `if removed_tasks_lead_to_dagrun_failure and
tis_in_removed_state:` then we can remove the duplication and a level of
indentation (indentation being a sign of complex code)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services