This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c6362  When one_success mark task as failed if no success (#15467)
32c6362 is described below

commit 32c63626845b9885c2349190ba9e44691007bbdb
Author: r-richmond <[email protected]>
AuthorDate: Tue May 4 01:52:28 2021 -0700

    When one_success mark task as failed if no success (#15467)
---
 airflow/ti_deps/deps/trigger_rule_dep.py |  6 +++++-
 tests/models/test_taskinstance.py        | 25 ++++++++++++++++---------
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/airflow/ti_deps/deps/trigger_rule_dep.py 
b/airflow/ti_deps/deps/trigger_rule_dep.py
index 766b133..3ac3bf0 100644
--- a/airflow/ti_deps/deps/trigger_rule_dep.py
+++ b/airflow/ti_deps/deps/trigger_rule_dep.py
@@ -134,8 +134,12 @@ class TriggerRuleDep(BaseTIDep):
                 if successes or skipped:
                     ti.set_state(State.SKIPPED, session)
             elif trigger_rule == TR.ONE_SUCCESS:
-                if upstream_done and not successes:
+                if upstream_done and done == skipped:
+                    # if upstream is done and all are skipped mark as skipped
                     ti.set_state(State.SKIPPED, session)
+                elif upstream_done and successes <= 0:
+                    # if upstream is done and there are no successes mark as 
upstream failed
+                    ti.set_state(State.UPSTREAM_FAILED, session)
             elif trigger_rule == TR.ONE_FAILED:
                 if upstream_done and not (failed or upstream_failed):
                     ti.set_state(State.SKIPPED, session)
diff --git a/tests/models/test_taskinstance.py 
b/tests/models/test_taskinstance.py
index 8c3b100..3184992 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -905,6 +905,13 @@ class TestTaskInstance(unittest.TestCase):
             ['one_success', 2, 0, 0, 0, 2, True, None, True],
             ['one_success', 2, 0, 1, 0, 3, True, None, True],
             ['one_success', 2, 1, 0, 0, 3, True, None, True],
+            ['one_success', 0, 5, 0, 0, 5, True, State.SKIPPED, False],
+            ['one_success', 0, 4, 1, 0, 5, True, State.UPSTREAM_FAILED, False],
+            ['one_success', 0, 3, 1, 1, 5, True, State.UPSTREAM_FAILED, False],
+            ['one_success', 0, 4, 0, 1, 5, True, State.UPSTREAM_FAILED, False],
+            ['one_success', 0, 0, 5, 0, 5, True, State.UPSTREAM_FAILED, False],
+            ['one_success', 0, 0, 4, 1, 5, True, State.UPSTREAM_FAILED, False],
+            ['one_success', 0, 0, 0, 5, 5, True, State.UPSTREAM_FAILED, False],
             #
             # Tests for all_failed
             #
@@ -932,15 +939,15 @@ class TestTaskInstance(unittest.TestCase):
     )
     def test_check_task_dependencies(
         self,
-        trigger_rule,
-        successes,
-        skipped,
-        failed,
-        upstream_failed,
-        done,
-        flag_upstream_failed,
-        expect_state,
-        expect_completed,
+        trigger_rule: str,
+        successes: int,
+        skipped: int,
+        failed: int,
+        upstream_failed: int,
+        done: int,
+        flag_upstream_failed: bool,
+        expect_state: State,
+        expect_completed: bool,
     ):
         start_date = timezone.datetime(2016, 2, 1, 0, 0, 0)
         dag = models.DAG('test-dag', start_date=start_date)

Reply via email to