[
https://issues.apache.org/jira/browse/AIRFLOW-3336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16697432#comment-16697432
]
ASF GitHub Bot commented on AIRFLOW-3336:
-----------------------------------------
kaxil closed pull request #4182: [AIRFLOW-3336] Add new TriggerRule that will
consider skipped ancestors as success
URL: https://github.com/apache/incubator-airflow/pull/4182
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index bb068499fe..df349ac996 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2440,7 +2440,7 @@ class derived from this one results in the creation of a
task object,
:param trigger_rule: defines the rule by which dependencies are applied
for the task to get triggered. Options are:
``{ all_success | all_failed | all_done | one_success |
- one_failed | dummy}``
+ one_failed | none_failed | dummy}``
default is ``all_success``. Options can be set as string or
using the constants defined in the static class
``airflow.utils.TriggerRule``
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index 8c9505db71..f1d58d0057 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -152,6 +152,11 @@ def _evaluate_trigger_rule(
elif tr == TR.ONE_FAILED:
if upstream_done and not (failed or upstream_failed):
ti.set_state(State.SKIPPED, session)
+ elif tr == TR.NONE_FAILED:
+ if upstream_failed or failed:
+ ti.set_state(State.UPSTREAM_FAILED, session)
+ elif skipped == upstream:
+ ti.set_state(State.SKIPPED, session)
if tr == TR.ONE_SUCCESS:
if successes <= 0:
@@ -194,6 +199,15 @@ def _evaluate_trigger_rule(
"upstream_task_ids={3}"
.format(tr, upstream_done, upstream_tasks_state,
task.upstream_task_ids))
+ elif tr == TR.NONE_FAILED:
+ num_failures = upstream - successes - skipped
+ if num_failures > 0:
+ yield self._failing_status(
+ reason="Task's trigger rule '{0}' requires all upstream "
+ "tasks to have succeeded or been skipped, but found {1}
non-success(es). "
+ "upstream_tasks_state={2}, upstream_task_ids={3}"
+ .format(tr, num_failures, upstream_tasks_state,
+ task.upstream_task_ids))
else:
yield self._failing_status(
reason="No strategy to evaluate trigger rule
'{0}'.".format(tr))
diff --git a/airflow/utils/trigger_rule.py b/airflow/utils/trigger_rule.py
index 7fdcbc8ca8..4f7db65f7b 100644
--- a/airflow/utils/trigger_rule.py
+++ b/airflow/utils/trigger_rule.py
@@ -29,6 +29,7 @@ class TriggerRule(object):
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
+ NONE_FAILED = 'none_failed'
_ALL_TRIGGER_RULES = {}
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 2896010248..13aa508f1d 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -652,6 +652,7 @@ while creating tasks:
* ``all_done``: all parents are done with their execution
* ``one_failed``: fires as soon as at least one parent has failed, it does not
wait for all parents to be done
* ``one_success``: fires as soon as at least one parent succeeds, it does not
wait for all parents to be done
+* ``none_failed``: all parents have not failed (``failed`` or
``upstream_failed``) i.e. all parents have succeeded or been skipped
* ``dummy``: dependencies are just for show, trigger at will
Note that these can be used in conjunction with ``depends_on_past`` (boolean)
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py
b/tests/ti_deps/deps/test_trigger_rule_dep.py
index 14dc01756c..2f3258f810 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -163,6 +163,44 @@ def test_all_success_tr_failure(self):
self.assertEqual(len(dep_statuses), 1)
self.assertFalse(dep_statuses[0].passed)
+ def test_none_failed_tr_success(self):
+ """
+ All success including skip trigger rule success
+ """
+ ti = self._get_task_instance(TriggerRule.NONE_FAILED,
+ upstream_task_ids=["FakeTaskID",
+ "OtherFakeTaskID"])
+ dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+ ti=ti,
+ successes=1,
+ skipped=1,
+ failed=0,
+ upstream_failed=0,
+ done=2,
+ flag_upstream_failed=False,
+ session="Fake Session"))
+ self.assertEqual(len(dep_statuses), 0)
+
+ def test_none_failed_tr_failure(self):
+ """
+ All success including skip trigger rule failure
+ """
+ ti = self._get_task_instance(TriggerRule.NONE_FAILED,
+ upstream_task_ids=["FakeTaskID",
+ "OtherFakeTaskID",
+ "FailedFakeTaskID"])
+ dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+ ti=ti,
+ successes=1,
+ skipped=1,
+ failed=1,
+ upstream_failed=0,
+ done=3,
+ flag_upstream_failed=False,
+ session="Fake Session"))
+ self.assertEqual(len(dep_statuses), 1)
+ self.assertFalse(dep_statuses[0].passed)
+
def test_all_failed_tr_success(self):
"""
All-failed trigger rule success
diff --git a/tests/utils/test_trigger_rule.py b/tests/utils/test_trigger_rule.py
index f068542849..ab21edd343 100644
--- a/tests/utils/test_trigger_rule.py
+++ b/tests/utils/test_trigger_rule.py
@@ -29,5 +29,6 @@ def test_valid_trigger_rules(self):
self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_DONE))
self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_SUCCESS))
self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_FAILED))
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.NONE_FAILED))
self.assertTrue(TriggerRule.is_valid(TriggerRule.DUMMY))
- self.assertEqual(len(TriggerRule.all_triggers()), 6)
+ self.assertEqual(len(TriggerRule.all_triggers()), 7)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Add ability for "skipped" state to be considered success
> --------------------------------------------------------
>
> Key: AIRFLOW-3336
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3336
> Project: Apache Airflow
> Issue Type: Improvement
> Components: DAG
> Reporter: Ryan Nowacoski
> Assignee: Ryan Nowacoski
> Priority: Trivial
> Labels: beginner, usability
>
> Take the case where a task has 2 or more upstream parents and 1 or more of
> them can skipped. If TriggerRule ALL_DONE is used then the task will trigger
> even when upstream tasks fail. However if TriggerRule ALL_SUCCESS is used the
> task won't be triggered if any upstream are skipped. This creates a gap in
> functionality where it is necessary for "skipped" to be treated as "success"
> so that the task only runs if all parents succeed or are skipped. Said
> another way this allows tasks to be run if all ancestors do NOT fail.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)