This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b731f4407 Exclude missing tasks from the gantt view (#23627)
4b731f4407 is described below

commit 4b731f440734b7a0da1bbc8595702aaa1110ad8d
Author: Joel Ossher <[email protected]>
AuthorDate: Fri May 20 15:24:14 2022 -0400

    Exclude missing tasks from the gantt view (#23627)
    
    * Exclude missing tasks from the gantt view
    
    Stops the gantt view from crashing if a task no longer exists
    in a DAG but there are TaskInstances for that task.
    
    * Fix tests
---
 airflow/www/views.py                      |  4 +++
 tests/www/views/test_views_graph_gantt.py | 54 +++++++++++++++++++++++++++++--
 2 files changed, 55 insertions(+), 3 deletions(-)

diff --git a/airflow/www/views.py b/airflow/www/views.py
index b66c38140e..48f6a47f5e 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3323,6 +3323,8 @@ class Airflow(AirflowBaseView):
 
         tasks = []
         for ti in tis:
+            if not dag.has_task(ti.task_id):
+                continue
             # prev_attempted_tries will reflect the currently running 
try_number
             # or the try_number of the last complete run
             # https://issues.apache.org/jira/browse/AIRFLOW-2143
@@ -3339,6 +3341,8 @@ class Airflow(AirflowBaseView):
         try_count = 1
         prev_task_id = ""
         for failed_task_instance in ti_fails:
+            if not dag.has_task(failed_task_instance.task_id):
+                continue
             if tf_count != 0 and failed_task_instance.task_id == prev_task_id:
                 try_count += 1
             else:
diff --git a/tests/www/views/test_views_graph_gantt.py 
b/tests/www/views/test_views_graph_gantt.py
index 0d549d9bbd..d124bb07be 100644
--- a/tests/www/views/test_views_graph_gantt.py
+++ b/tests/www/views/test_views_graph_gantt.py
@@ -21,10 +21,11 @@ from urllib.parse import quote
 import pytest
 
 from airflow.configuration import conf
-from airflow.models import DAG
+from airflow.models import DAG, DagRun
+from airflow.models.baseoperator import BaseOperator
 from airflow.utils import timezone
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.state import State, TaskInstanceState
 
 DAG_ID = "dag_for_testing_dt_nr_dr_form"
 DEFAULT_DATE = timezone.datetime(2017, 9, 1)
@@ -297,3 +298,50 @@ def 
test_uses_base_date_if_changed_away_from_execution_date(admin_client, very_c
     _assert_run_is_not_in_dropdown(very_close_dagruns[1], data)
     _assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data)
     _assert_run_is_selected(very_close_dagruns[3], data)
+
+
[email protected]("endpoint", ENDPOINTS)
+def test_view_works_with_deleted_tasks(request, admin_client, app, endpoint):
+    task_to_state = {
+        "existing-task": TaskInstanceState.SUCCESS,
+        "deleted-task-success": TaskInstanceState.SUCCESS,
+        "deleted-task-failed": TaskInstanceState.FAILED,
+    }
+    dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+    for task_id in task_to_state.keys():
+        BaseOperator(task_id=task_id, dag=dag)
+
+    execution_date = timezone.datetime(2022, 3, 14)
+    dag_run_id = "test-deleted-tasks-dag-run"
+    with create_session() as session:
+        dag_run = dag.create_dagrun(
+            run_id=dag_run_id,
+            execution_date=execution_date,
+            data_interval=(execution_date, execution_date + 
timedelta(minutes=5)),
+            state=State.SUCCESS,
+            external_trigger=True,
+            session=session,
+        )
+        for ti in dag_run.task_instances:
+            ti.refresh_from_task(dag.get_task(ti.task_id))
+            ti.state = task_to_state[ti.task_id]
+            ti.start_date = execution_date
+            ti.end_date = execution_date + timedelta(minutes=5)
+            session.merge(ti)
+
+    def cleanup_database():
+        with create_session() as session:
+            session.query(DagRun).filter_by(run_id=dag_run_id).delete()
+
+    request.addfinalizer(cleanup_database)
+
+    dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+    BaseOperator(task_id="existing-task", dag=dag)
+    app.dag_bag.bag_dag(dag=dag, root_dag=dag)
+
+    response = admin_client.get(
+        f'{endpoint}&execution_date={execution_date.isoformat()}',
+        data={"username": "test", "password": "test"},
+        follow_redirects=True,
+    )
+    assert response.status_code == 200

Reply via email to