This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 4b731f4407 Exclude missing tasks from the gantt view (#23627)
4b731f4407 is described below
commit 4b731f440734b7a0da1bbc8595702aaa1110ad8d
Author: Joel Ossher <[email protected]>
AuthorDate: Fri May 20 15:24:14 2022 -0400
Exclude missing tasks from the gantt view (#23627)
* Exclude missing tasks from the gantt view
Stops the gantt view from crashing if a task no longer exists
in a DAG but there are TaskInstances for that task.
* Fix tests
---
airflow/www/views.py | 4 +++
tests/www/views/test_views_graph_gantt.py | 54 +++++++++++++++++++++++++++++--
2 files changed, 55 insertions(+), 3 deletions(-)
diff --git a/airflow/www/views.py b/airflow/www/views.py
index b66c38140e..48f6a47f5e 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -3323,6 +3323,8 @@ class Airflow(AirflowBaseView):
tasks = []
for ti in tis:
+ if not dag.has_task(ti.task_id):
+ continue
# prev_attempted_tries will reflect the currently running
try_number
# or the try_number of the last complete run
# https://issues.apache.org/jira/browse/AIRFLOW-2143
@@ -3339,6 +3341,8 @@ class Airflow(AirflowBaseView):
try_count = 1
prev_task_id = ""
for failed_task_instance in ti_fails:
+ if not dag.has_task(failed_task_instance.task_id):
+ continue
if tf_count != 0 and failed_task_instance.task_id == prev_task_id:
try_count += 1
else:
diff --git a/tests/www/views/test_views_graph_gantt.py
b/tests/www/views/test_views_graph_gantt.py
index 0d549d9bbd..d124bb07be 100644
--- a/tests/www/views/test_views_graph_gantt.py
+++ b/tests/www/views/test_views_graph_gantt.py
@@ -21,10 +21,11 @@ from urllib.parse import quote
import pytest
from airflow.configuration import conf
-from airflow.models import DAG
+from airflow.models import DAG, DagRun
+from airflow.models.baseoperator import BaseOperator
from airflow.utils import timezone
-from airflow.utils.session import provide_session
-from airflow.utils.state import State
+from airflow.utils.session import create_session, provide_session
+from airflow.utils.state import State, TaskInstanceState
DAG_ID = "dag_for_testing_dt_nr_dr_form"
DEFAULT_DATE = timezone.datetime(2017, 9, 1)
@@ -297,3 +298,50 @@ def
test_uses_base_date_if_changed_away_from_execution_date(admin_client, very_c
_assert_run_is_not_in_dropdown(very_close_dagruns[1], data)
_assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data)
_assert_run_is_selected(very_close_dagruns[3], data)
+
+
[email protected]("endpoint", ENDPOINTS)
+def test_view_works_with_deleted_tasks(request, admin_client, app, endpoint):
+ task_to_state = {
+ "existing-task": TaskInstanceState.SUCCESS,
+ "deleted-task-success": TaskInstanceState.SUCCESS,
+ "deleted-task-failed": TaskInstanceState.FAILED,
+ }
+ dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+ for task_id in task_to_state.keys():
+ BaseOperator(task_id=task_id, dag=dag)
+
+ execution_date = timezone.datetime(2022, 3, 14)
+ dag_run_id = "test-deleted-tasks-dag-run"
+ with create_session() as session:
+ dag_run = dag.create_dagrun(
+ run_id=dag_run_id,
+ execution_date=execution_date,
+ data_interval=(execution_date, execution_date +
timedelta(minutes=5)),
+ state=State.SUCCESS,
+ external_trigger=True,
+ session=session,
+ )
+ for ti in dag_run.task_instances:
+ ti.refresh_from_task(dag.get_task(ti.task_id))
+ ti.state = task_to_state[ti.task_id]
+ ti.start_date = execution_date
+ ti.end_date = execution_date + timedelta(minutes=5)
+ session.merge(ti)
+
+ def cleanup_database():
+ with create_session() as session:
+ session.query(DagRun).filter_by(run_id=dag_run_id).delete()
+
+ request.addfinalizer(cleanup_database)
+
+ dag = DAG(DAG_ID, start_date=DEFAULT_DATE)
+ BaseOperator(task_id="existing-task", dag=dag)
+ app.dag_bag.bag_dag(dag=dag, root_dag=dag)
+
+ response = admin_client.get(
+ f'{endpoint}&execution_date={execution_date.isoformat()}',
+ data={"username": "test", "password": "test"},
+ follow_redirects=True,
+ )
+ assert response.status_code == 200