This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8fde9a4 Rename ``none_failed_or_skipped`` by
``none_failed_min_one_success`` trigger rule (#17683)
8fde9a4 is described below
commit 8fde9a4b5b1360f733d86a04c2b0b747f0bfbf01
Author: raphaelauv <[email protected]>
AuthorDate: Fri Aug 20 04:49:25 2021 +0200
Rename ``none_failed_or_skipped`` by ``none_failed_min_one_success``
trigger rule (#17683)
closes #17012
Co-authored-by: Kaxil Naik <[email protected]>
---
UPDATING.md | 6 ++++++
airflow/api_connexion/openapi/v1.yaml | 1 +
airflow/example_dags/example_branch_operator.py | 2 +-
airflow/example_dags/example_nested_branch_dag.py | 6 +++---
airflow/models/baseoperator.py | 11 ++++++++++-
airflow/ti_deps/deps/trigger_rule_dep.py | 4 ++--
airflow/utils/trigger_rule.py | 1 +
docs/apache-airflow/concepts/dags.rst | 4 ++--
tests/operators/test_python.py | 2 +-
tests/ti_deps/deps/test_trigger_rule_dep.py | 12 ++++++------
tests/utils/test_trigger_rule.py | 3 ++-
11 files changed, 35 insertions(+), 17 deletions(-)
diff --git a/UPDATING.md b/UPDATING.md
index cee87bb..e5a08cd 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -92,6 +92,12 @@ pip install -U "apache-airflow[pandas]==2.1.2" \
--constraint
https://raw.githubusercontent.com/apache/airflow/constraints-2.1.2/constraints-3.8.txt"
```
+### `none_failed_or_skipped` trigger rule has been deprecated
+
+`TriggerRule.NONE_FAILED_OR_SKIPPED` is replaced by
`TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS`.
+This is only name change, no functionality changes made.
+This change is backward compatible however
`TriggerRule.NONE_FAILED_OR_SKIPPED` will be removed in next major release.
+
### Dummy trigger rule has been deprecated
`TriggerRule.DUMMY` is replaced by `TriggerRule.ALWAYS`.
diff --git a/airflow/api_connexion/openapi/v1.yaml
b/airflow/api_connexion/openapi/v1.yaml
index 485e028..9ff9484 100644
--- a/airflow/api_connexion/openapi/v1.yaml
+++ b/airflow/api_connexion/openapi/v1.yaml
@@ -3155,6 +3155,7 @@ components:
- none_failed
- none_skipped
- none_failed_or_skipped
+ - none_failed_min_one_success
- dummy
WeightRule:
diff --git a/airflow/example_dags/example_branch_operator.py
b/airflow/example_dags/example_branch_operator.py
index 7928490..ca85e6d 100644
--- a/airflow/example_dags/example_branch_operator.py
+++ b/airflow/example_dags/example_branch_operator.py
@@ -52,7 +52,7 @@ with DAG(
join = DummyOperator(
task_id='join',
- trigger_rule='none_failed_or_skipped',
+ trigger_rule='none_failed_min_one_success',
)
for option in options:
diff --git a/airflow/example_dags/example_nested_branch_dag.py
b/airflow/example_dags/example_nested_branch_dag.py
index 76141e2..d4df2bf 100644
--- a/airflow/example_dags/example_nested_branch_dag.py
+++ b/airflow/example_dags/example_nested_branch_dag.py
@@ -18,7 +18,7 @@
"""
Example DAG demonstrating a workflow with nested branching. The join tasks are
created with
-``none_failed_or_skipped`` trigger rule such that they are skipped whenever
their corresponding
+``none_failed_min_one_success`` trigger rule such that they are skipped
whenever their corresponding
``BranchPythonOperator`` are skipped.
"""
@@ -31,11 +31,11 @@ with DAG(
dag_id="example_nested_branch_dag", start_date=days_ago(2),
schedule_interval="@daily", tags=["example"]
) as dag:
branch_1 = BranchPythonOperator(task_id="branch_1",
python_callable=lambda: "true_1")
- join_1 = DummyOperator(task_id="join_1",
trigger_rule="none_failed_or_skipped")
+ join_1 = DummyOperator(task_id="join_1",
trigger_rule="none_failed_min_one_success")
true_1 = DummyOperator(task_id="true_1")
false_1 = DummyOperator(task_id="false_1")
branch_2 = BranchPythonOperator(task_id="branch_2",
python_callable=lambda: "true_2")
- join_2 = DummyOperator(task_id="join_2",
trigger_rule="none_failed_or_skipped")
+ join_2 = DummyOperator(task_id="join_2",
trigger_rule="none_failed_min_one_success")
true_2 = DummyOperator(task_id="true_2")
false_2 = DummyOperator(task_id="false_2")
false_3 = DummyOperator(task_id="false_3")
diff --git a/airflow/models/baseoperator.py b/airflow/models/baseoperator.py
index 78a0fc9..a131640 100644
--- a/airflow/models/baseoperator.py
+++ b/airflow/models/baseoperator.py
@@ -350,7 +350,7 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin,
metaclass=BaseOperatorMeta
:param trigger_rule: defines the rule by which dependencies are applied
for the task to get triggered. Options are:
``{ all_success | all_failed | all_done | one_success |
- one_failed | none_failed | none_failed_or_skipped | none_skipped |
always}``
+ one_failed | none_failed | none_failed_min_one_success | none_skipped
| always}``
default is ``all_success``. Options can be set as string or
using the constants defined in the static class
``airflow.utils.TriggerRule``
@@ -553,6 +553,15 @@ class BaseOperator(Operator, LoggingMixin, TaskMixin,
metaclass=BaseOperatorMeta
)
trigger_rule = TriggerRule.ALWAYS
+ if trigger_rule == "none_failed_or_skipped":
+ warnings.warn(
+ "none_failed_or_skipped Trigger Rule is deprecated. "
+ "Please use `none_failed_min_one_success`.",
+ DeprecationWarning,
+ stacklevel=2,
+ )
+ trigger_rule = TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS
+
if not TriggerRule.is_valid(trigger_rule):
raise AirflowException(
"The trigger_rule must be one of {all_triggers},"
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index cc1672f..f04cf31 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -146,7 +146,7 @@ class TriggerRuleDep(BaseTIDep):
elif trigger_rule == TR.NONE_FAILED:
if upstream_failed or failed:
ti.set_state(State.UPSTREAM_FAILED, session)
- elif trigger_rule == TR.NONE_FAILED_OR_SKIPPED:
+ elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
if upstream_failed or failed:
ti.set_state(State.UPSTREAM_FAILED, session)
elif skipped == upstream:
@@ -213,7 +213,7 @@ class TriggerRuleDep(BaseTIDep):
trigger_rule, num_failures, upstream_tasks_state,
task.upstream_task_ids
)
)
- elif trigger_rule == TR.NONE_FAILED_OR_SKIPPED:
+ elif trigger_rule == TR.NONE_FAILED_MIN_ONE_SUCCESS:
num_failures = upstream - successes - skipped
if num_failures > 0:
yield self._failing_status(
diff --git a/airflow/utils/trigger_rule.py b/airflow/utils/trigger_rule.py
index c9ac00a..890bdc7 100644
--- a/airflow/utils/trigger_rule.py
+++ b/airflow/utils/trigger_rule.py
@@ -32,6 +32,7 @@ class TriggerRule:
NONE_SKIPPED = 'none_skipped'
DUMMY = 'dummy'
ALWAYS = 'always'
+ NONE_FAILED_MIN_ONE_SUCCESS = "none_failed_min_one_success"
_ALL_TRIGGER_RULES: Set[str] = set()
diff --git a/docs/apache-airflow/concepts/dags.rst
b/docs/apache-airflow/concepts/dags.rst
index f871528..6133a47 100644
--- a/docs/apache-airflow/concepts/dags.rst
+++ b/docs/apache-airflow/concepts/dags.rst
@@ -335,7 +335,7 @@ However, this is just the default behaviour, and you can
control it using the ``
* ``one_failed``: At least one upstream task has failed (does not wait for all
upstream tasks to be done)
* ``one_success``: At least one upstream task has succeeded (does not wait for
all upstream tasks to be done)
* ``none_failed``: All upstream tasks have not ``failed`` or
``upstream_failed`` - that is, all upstream tasks have succeeded or been skipped
-* ``none_failed_or_skipped``: All upstream tasks have not ``failed`` or
``upstream_failed``, and at least one upstream task has succeeded.
+* ``none_failed_min_one_success``: All upstream tasks have not ``failed`` or
``upstream_failed``, and at least one upstream task has succeeded.
* ``none_skipped``: No upstream task is in a ``skipped`` state - that is, all
upstream tasks are in a ``success``, ``failed``, or ``upstream_failed`` state
* ``always``: No dependencies at all, run this task at any time
@@ -382,7 +382,7 @@ You can also combine this with the
:ref:`concepts:depends-on-past` functionality
.. image:: /img/branch_without_trigger.png
- By setting ``trigger_rule`` to ``none_failed_or_skipped`` in the ``join``
task, we can instead get the intended behaviour:
+ By setting ``trigger_rule`` to ``none_failed_min_one_success`` in the
``join`` task, we can instead get the intended behaviour:
.. image:: /img/branch_with_trigger.png
diff --git a/tests/operators/test_python.py b/tests/operators/test_python.py
index 82fbd22..9357aff 100644
--- a/tests/operators/test_python.py
+++ b/tests/operators/test_python.py
@@ -1154,7 +1154,7 @@ def test_empty_branch(choice, expected_states):
) as dag:
branch = BranchPythonOperator(task_id='branch',
python_callable=lambda: choice)
task1 = DummyOperator(task_id='task1')
- join = DummyOperator(task_id='join',
trigger_rule="none_failed_or_skipped")
+ join = DummyOperator(task_id='join',
trigger_rule="none_failed_min_one_success")
branch >> [task1, join]
task1 >> join
diff --git a/tests/ti_deps/deps/test_trigger_rule_dep.py
b/tests/ti_deps/deps/test_trigger_rule_dep.py
index 8a1247b..fee1e1f 100644
--- a/tests/ti_deps/deps/test_trigger_rule_dep.py
+++ b/tests/ti_deps/deps/test_trigger_rule_dep.py
@@ -299,12 +299,12 @@ class TestTriggerRuleDep(unittest.TestCase):
assert len(dep_statuses) == 1
assert not dep_statuses[0].passed
- def test_none_failed_or_skipped_tr_success(self):
+ def test_none_failed_min_one_success_tr_success(self):
"""
All success including skip trigger rule success
"""
ti = self._get_task_instance(
- TriggerRule.NONE_FAILED_OR_SKIPPED,
upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]
+ TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]
)
dep_statuses = tuple(
TriggerRuleDep()._evaluate_trigger_rule(
@@ -320,12 +320,12 @@ class TestTriggerRuleDep(unittest.TestCase):
)
assert len(dep_statuses) == 0
- def test_none_failed_or_skipped_tr_skipped(self):
+ def test_none_failed_min_one_success_tr_skipped(self):
"""
All success including all upstream skips trigger rule success
"""
ti = self._get_task_instance(
- TriggerRule.NONE_FAILED_OR_SKIPPED,
upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]
+ TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
upstream_task_ids=["FakeTaskID", "OtherFakeTaskID"]
)
dep_statuses = tuple(
TriggerRuleDep()._evaluate_trigger_rule(
@@ -342,12 +342,12 @@ class TestTriggerRuleDep(unittest.TestCase):
assert len(dep_statuses) == 0
assert ti.state == State.SKIPPED
- def test_none_failed_or_skipped_tr_failure(self):
+ def test_none_failed_min_one_success_tr_failure(self):
"""
All success including skip trigger rule failure
"""
ti = self._get_task_instance(
- TriggerRule.NONE_FAILED_OR_SKIPPED,
+ TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS,
upstream_task_ids=["FakeTaskID", "OtherFakeTaskID",
"FailedFakeTaskID"],
)
dep_statuses = tuple(
diff --git a/tests/utils/test_trigger_rule.py b/tests/utils/test_trigger_rule.py
index 9a03808..5132c34 100644
--- a/tests/utils/test_trigger_rule.py
+++ b/tests/utils/test_trigger_rule.py
@@ -33,4 +33,5 @@ class TestTriggerRule(unittest.TestCase):
assert TriggerRule.is_valid(TriggerRule.NONE_SKIPPED)
assert TriggerRule.is_valid(TriggerRule.DUMMY)
assert TriggerRule.is_valid(TriggerRule.ALWAYS)
- assert len(TriggerRule.all_triggers()) == 10
+ assert TriggerRule.is_valid(TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)
+ assert len(TriggerRule.all_triggers()) == 11