eladkal commented on code in PR #23079:
URL: https://github.com/apache/airflow/pull/23079#discussion_r1102836004


##########
tests/www/views/test_views.py:
##########
@@ -306,6 +307,174 @@ def get_task_instance(session, task):
         assert dagrun.get_state() == State.QUEUED
 
 
[email protected](
+    "downstream, set_state, end_state",
+    [
+        (
+            True,
+            [
+                ("task_1", State.SUCCESS),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.SUCCESS),
+            ],
+            [
+                ("task_1", State.FAILED),
+                ("task_2", State.FAILED),
+                ("task_3", State.FAILED),
+            ],
+        ),
+        (
+            True,
+            [
+                ("task_1", State.SUCCESS),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.FAILED),
+            ],
+            [
+                ("task_1", State.FAILED),
+                ("task_2", State.FAILED),
+                ("task_3", State.FAILED),
+            ],
+        ),
+        (
+            False,
+            [
+                ("task_1", State.SUCCESS),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.FAILED),
+            ],
+            [
+                ("task_1", State.FAILED),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.NONE),
+            ],
+        ),
+        (
+            False,
+            [
+                ("task_1", State.FAILED),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.FAILED),
+            ],
+            [
+                ("task_1", State.FAILED),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.FAILED),
+            ],
+        ),
+        (
+            False,
+            [
+                ("task_1", State.SUCCESS),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.SUCCESS),
+            ],
+            [
+                ("task_1", State.FAILED),
+                ("task_2", State.SUCCESS),
+                ("task_3", State.SUCCESS),
+            ],
+        ),
+    ],
+    ids=[
+        "downstream[True]-SSS-FFF",
+        "downstream[True]-SSF-FFF",
+        "downstream[False]-SSF-FSN",
+        "downstream[False]-FSF-FSF",
+        "downstream[False]-SSS-FSS",
+    ],
+)
+def test_mark_task_instance_state_failed_downstream_clear(test_app, 
downstream, set_state, end_state):
+    """
+    * When downstream is True and there is no failed task
+    - Marks the given TaskInstance as FAILED.
+    - Downstream TaskInstances are marked as FAILED.
+    - Set DagRun to QUEUED and ensure it's state is SUCCESS.
+
+    * When downstream is True and there is a failed task
+    - Marks the given TaskInstance as FAILED.
+    - Downstream TaskInstances are marked as FAILED and ones that are already 
FAILED should not be cleared.
+    - Set DagRun to QUEUED and ensure it's state is SUCCESS.
+
+    * When downstream is False and there is no failed task
+    - Marks the given TaskInstance as FAILED.
+    - Downstream TaskInstances should not be cleared.
+    - Set DagRun to QUEUED and ensure it's state is SUCCESS.
+
+    * When downstream is False and there is a failed task
+    - Marks the given TaskInstance as FAILED.
+    - Downstream TaskInstances that are already FAILED should be cleared.
+    - Set DagRun to QUEUED and ensure it's state is SUCCESS.
+    """
+    from airflow.models import DAG, DagBag, TaskInstance
+    from airflow.operators.empty import EmptyOperator
+    from airflow.utils.session import create_session
+    from airflow.utils.timezone import datetime
+    from airflow.utils.types import DagRunType
+    from airflow.www.views import Airflow
+    from tests.test_utils.db import clear_db_runs
+
+    clear_db_runs()
+    start_date = datetime(2020, 1, 1)
+    with DAG("test_mark_task_instance_state_failed_downstream_clear", 
start_date=start_date) as dag:
+        task_1 = EmptyOperator(task_id="task_1")
+        task_2 = EmptyOperator(task_id="task_2")
+        task_3 = EmptyOperator(task_id="task_3")
+
+        task_1 >> task_2 >> task_3
+
+    dagrun = dag.create_dagrun(
+        start_date=start_date,
+        execution_date=start_date,
+        data_interval=(start_date, start_date),
+        state=State.SUCCESS,
+        run_type=DagRunType.SCHEDULED,
+    )
+
+    def get_task_instance(session, task_id):
+        return (
+            session.query(TaskInstance)
+            .filter(
+                TaskInstance.dag_id == dag.dag_id,
+                TaskInstance.task_id == task_id,
+                TaskInstance.execution_date == start_date,
+            )
+            .one()
+        )
+
+    with create_session() as session:
+        for (task, state) in set_state:
+            get_task_instance(session, task).state = state
+
+        session.commit()
+
+    assert dagrun.get_state() == State.SUCCESS
+    test_app.dag_bag = DagBag(dag_folder='/dev/null', include_examples=False)

Review Comment:
   ```suggestion
       test_app.dag_bag = DagBag(dag_folder="/dev/null", include_examples=False)
   ```
   
   To fix static checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to