jlricon closed pull request #4227: [AIRFLOW-3384] Allow higher versions of
Sqlalchemy and Jinja2
URL: https://github.com/apache/incubator-airflow/pull/4227
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/models.py b/airflow/models.py
index 95ce629d3b..9ab2348cc2 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2440,7 +2440,7 @@ class derived from this one results in the creation of a
task object,
:param trigger_rule: defines the rule by which dependencies are applied
for the task to get triggered. Options are:
``{ all_success | all_failed | all_done | one_success |
- one_failed | dummy}``
+ one_failed | none_failed | dummy}``
default is ``all_success``. Options can be set as string or
using the constants defined in the static class
``airflow.utils.TriggerRule``
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index 8c9505db71..f1d58d0057 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -152,6 +152,11 @@ def _evaluate_trigger_rule(
elif tr == TR.ONE_FAILED:
if upstream_done and not (failed or upstream_failed):
ti.set_state(State.SKIPPED, session)
+ elif tr == TR.NONE_FAILED:
+ if upstream_failed or failed:
+ ti.set_state(State.UPSTREAM_FAILED, session)
+ elif skipped == upstream:
+ ti.set_state(State.SKIPPED, session)
if tr == TR.ONE_SUCCESS:
if successes <= 0:
@@ -194,6 +199,15 @@ def _evaluate_trigger_rule(
"upstream_task_ids={3}"
.format(tr, upstream_done, upstream_tasks_state,
task.upstream_task_ids))
+ elif tr == TR.NONE_FAILED:
+ num_failures = upstream - successes - skipped
+ if num_failures > 0:
+ yield self._failing_status(
+ reason="Task's trigger rule '{0}' requires all upstream "
+ "tasks to have succeeded or been skipped, but found {1}
non-success(es). "
+ "upstream_tasks_state={2}, upstream_task_ids={3}"
+ .format(tr, num_failures, upstream_tasks_state,
+ task.upstream_task_ids))
else:
yield self._failing_status(
reason="No strategy to evaluate trigger rule
'{0}'.".format(tr))
diff --git a/airflow/utils/trigger_rule.py b/airflow/utils/trigger_rule.py
index 7fdcbc8ca8..4f7db65f7b 100644
--- a/airflow/utils/trigger_rule.py
+++ b/airflow/utils/trigger_rule.py
@@ -29,6 +29,7 @@ class TriggerRule(object):
ONE_SUCCESS = 'one_success'
ONE_FAILED = 'one_failed'
DUMMY = 'dummy'
+ NONE_FAILED = 'none_failed'
_ALL_TRIGGER_RULES = {}
diff --git a/docs/concepts.rst b/docs/concepts.rst
index 8753958af3..8a497e499c 100644
--- a/docs/concepts.rst
+++ b/docs/concepts.rst
@@ -652,6 +652,7 @@ while creating tasks:
* ``all_done``: all parents are done with their execution
* ``one_failed``: fires as soon as at least one parent has failed, it does not
wait for all parents to be done
* ``one_success``: fires as soon as at least one parent succeeds, it does not
wait for all parents to be done
+* ``none_failed``: all parents have not failed (``failed`` or
``upstream_failed``) i.e. all parents have succeeded or been skipped
* ``dummy``: dependencies are just for show, trigger at will
Note that these can be used in conjunction with ``depends_on_past`` (boolean)
diff --git a/setup.py b/setup.py
index e651f5a66e..0c21f7d9ec 100644
--- a/setup.py
+++ b/setup.py
@@ -299,7 +299,7 @@ def do_setup():
'dill>=0.2.2, <0.3',
'flask>=0.12.4, <0.13',
'flask-appbuilder==1.12.1',
- 'flask-admin==1.4.1',
+ 'flask-admin==1.5.2',
'flask-caching>=1.3.3, <1.4.0',
'flask-login>=0.3, <0.5',
'flask-swagger==0.2.13',
@@ -310,7 +310,7 @@ def do_setup():
'gunicorn>=19.4.0, <20.0',
'iso8601>=0.1.12',
'json-merge-patch==0.2',
- 'jinja2>=2.7.3, <2.9.0',
+ 'jinja2>=2.7.3, <=2.10.0',
'lxml>=4.0.0',
'markdown>=2.5.2, <3.0',
'pandas>=0.17.1, <1.0.0',
@@ -322,7 +322,7 @@ def do_setup():
'python-nvd3==0.15.0',
'requests>=2.5.1, <3',
'setproctitle>=1.1.8, <2',
- 'sqlalchemy>=1.1.15, <1.2.0',
+ 'sqlalchemy>=1.1.15, <1.3.0',
'tabulate>=0.7.5, <=0.8.2',
'tenacity==4.8.0',
'thrift>=0.9.2',
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py
b/tests/ti_deps/deps/test_trigger_rule_dep.py
index 14dc01756c..2f3258f810 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -163,6 +163,44 @@ def test_all_success_tr_failure(self):
self.assertEqual(len(dep_statuses), 1)
self.assertFalse(dep_statuses[0].passed)
+ def test_none_failed_tr_success(self):
+ """
+ All success including skip trigger rule success
+ """
+ ti = self._get_task_instance(TriggerRule.NONE_FAILED,
+ upstream_task_ids=["FakeTaskID",
+ "OtherFakeTaskID"])
+ dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+ ti=ti,
+ successes=1,
+ skipped=1,
+ failed=0,
+ upstream_failed=0,
+ done=2,
+ flag_upstream_failed=False,
+ session="Fake Session"))
+ self.assertEqual(len(dep_statuses), 0)
+
+ def test_none_failed_tr_failure(self):
+ """
+ All success including skip trigger rule failure
+ """
+ ti = self._get_task_instance(TriggerRule.NONE_FAILED,
+ upstream_task_ids=["FakeTaskID",
+ "OtherFakeTaskID",
+ "FailedFakeTaskID"])
+ dep_statuses = tuple(TriggerRuleDep()._evaluate_trigger_rule(
+ ti=ti,
+ successes=1,
+ skipped=1,
+ failed=1,
+ upstream_failed=0,
+ done=3,
+ flag_upstream_failed=False,
+ session="Fake Session"))
+ self.assertEqual(len(dep_statuses), 1)
+ self.assertFalse(dep_statuses[0].passed)
+
def test_all_failed_tr_success(self):
"""
All-failed trigger rule success
diff --git a/tests/utils/test_trigger_rule.py b/tests/utils/test_trigger_rule.py
index f068542849..ab21edd343 100644
--- a/tests/utils/test_trigger_rule.py
+++ b/tests/utils/test_trigger_rule.py
@@ -29,5 +29,6 @@ def test_valid_trigger_rules(self):
self.assertTrue(TriggerRule.is_valid(TriggerRule.ALL_DONE))
self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_SUCCESS))
self.assertTrue(TriggerRule.is_valid(TriggerRule.ONE_FAILED))
+ self.assertTrue(TriggerRule.is_valid(TriggerRule.NONE_FAILED))
self.assertTrue(TriggerRule.is_valid(TriggerRule.DUMMY))
- self.assertEqual(len(TriggerRule.all_triggers()), 6)
+ self.assertEqual(len(TriggerRule.all_triggers()), 7)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services