This is an automated email from the ASF dual-hosted git repository.
ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 32c6362 When one_success mark task as failed if no success (#15467)
32c6362 is described below
commit 32c63626845b9885c2349190ba9e44691007bbdb
Author: r-richmond <[email protected]>
AuthorDate: Tue May 4 01:52:28 2021 -0700
When one_success mark task as failed if no success (#15467)
---
airflow/ti_deps/deps/trigger_rule_dep.py | 6 +++++-
tests/models/test_taskinstance.py | 25 ++++++++++++++++---------
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py
b/airflow/ti_deps/deps/trigger_rule_dep.py
index 766b133..3ac3bf0 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -134,8 +134,12 @@ class TriggerRuleDep(BaseTIDep):
if successes or skipped:
ti.set_state(State.SKIPPED, session)
elif trigger_rule == TR.ONE_SUCCESS:
- if upstream_done and not successes:
+ if upstream_done and done == skipped:
+ # if upstream is done and all are skipped mark as skipped
ti.set_state(State.SKIPPED, session)
+ elif upstream_done and successes <= 0:
+ # if upstream is done and there are no successes mark as
upstream failed
+ ti.set_state(State.UPSTREAM_FAILED, session)
elif trigger_rule == TR.ONE_FAILED:
if upstream_done and not (failed or upstream_failed):
ti.set_state(State.SKIPPED, session)
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 8c3b100..3184992 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -905,6 +905,13 @@ class TestTaskInstance(unittest.TestCase):
['one_success', 2, 0, 0, 0, 2, True, None, True],
['one_success', 2, 0, 1, 0, 3, True, None, True],
['one_success', 2, 1, 0, 0, 3, True, None, True],
+ ['one_success', 0, 5, 0, 0, 5, True, State.SKIPPED, False],
+ ['one_success', 0, 4, 1, 0, 5, True, State.UPSTREAM_FAILED, False],
+ ['one_success', 0, 3, 1, 1, 5, True, State.UPSTREAM_FAILED, False],
+ ['one_success', 0, 4, 0, 1, 5, True, State.UPSTREAM_FAILED, False],
+ ['one_success', 0, 0, 5, 0, 5, True, State.UPSTREAM_FAILED, False],
+ ['one_success', 0, 0, 4, 1, 5, True, State.UPSTREAM_FAILED, False],
+ ['one_success', 0, 0, 0, 5, 5, True, State.UPSTREAM_FAILED, False],
#
# Tests for all_failed
#
@@ -932,15 +939,15 @@ class TestTaskInstance(unittest.TestCase):
)
def test_check_task_dependencies(
self,
- trigger_rule,
- successes,
- skipped,
- failed,
- upstream_failed,
- done,
- flag_upstream_failed,
- expect_state,
- expect_completed,
+ trigger_rule: str,
+ successes: int,
+ skipped: int,
+ failed: int,
+ upstream_failed: int,
+ done: int,
+ flag_upstream_failed: bool,
+ expect_state: State,
+ expect_completed: bool,
):
start_date = timezone.datetime(2016, 2, 1, 0, 0, 0)
dag = models.DAG('test-dag', start_date=start_date)