This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-3-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 4ba96b836b2838e54d94713fd1f87f7b25378a5d Author: Joel Ossher <[email protected]> AuthorDate: Fri May 20 15:24:14 2022 -0400 Exclude missing tasks from the gantt view (#23627) * Exclude missing tasks from the gantt view Stops the gantt view from crashing if a task no longer exists in a DAG but there are TaskInstances for that task. * Fix tests (cherry picked from commit 4b731f440734b7a0da1bbc8595702aaa1110ad8d) --- airflow/www/views.py | 4 +++ tests/www/views/test_views_graph_gantt.py | 54 +++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/airflow/www/views.py b/airflow/www/views.py index 10c53e3025..2ab0067bb2 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -3322,6 +3322,8 @@ class Airflow(AirflowBaseView): tasks = [] for ti in tis: + if not dag.has_task(ti.task_id): + continue # prev_attempted_tries will reflect the currently running try_number # or the try_number of the last complete run # https://issues.apache.org/jira/browse/AIRFLOW-2143 @@ -3338,6 +3340,8 @@ class Airflow(AirflowBaseView): try_count = 1 prev_task_id = "" for failed_task_instance in ti_fails: + if not dag.has_task(failed_task_instance.task_id): + continue if tf_count != 0 and failed_task_instance.task_id == prev_task_id: try_count += 1 else: diff --git a/tests/www/views/test_views_graph_gantt.py b/tests/www/views/test_views_graph_gantt.py index 0d549d9bbd..d124bb07be 100644 --- a/tests/www/views/test_views_graph_gantt.py +++ b/tests/www/views/test_views_graph_gantt.py @@ -21,10 +21,11 @@ from urllib.parse import quote import pytest from airflow.configuration import conf -from airflow.models import DAG +from airflow.models import DAG, DagRun +from airflow.models.baseoperator import BaseOperator from airflow.utils import timezone -from airflow.utils.session import provide_session -from airflow.utils.state import State +from airflow.utils.session import create_session, provide_session +from airflow.utils.state import State, TaskInstanceState DAG_ID = "dag_for_testing_dt_nr_dr_form" DEFAULT_DATE = timezone.datetime(2017, 9, 1) @@ -297,3 +298,50 @@ def test_uses_base_date_if_changed_away_from_execution_date(admin_client, very_c _assert_run_is_not_in_dropdown(very_close_dagruns[1], data) _assert_run_is_in_dropdown_not_selected(very_close_dagruns[2], data) _assert_run_is_selected(very_close_dagruns[3], data) + + [email protected]("endpoint", ENDPOINTS) +def test_view_works_with_deleted_tasks(request, admin_client, app, endpoint): + task_to_state = { + "existing-task": TaskInstanceState.SUCCESS, + "deleted-task-success": TaskInstanceState.SUCCESS, + "deleted-task-failed": TaskInstanceState.FAILED, + } + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + for task_id in task_to_state.keys(): + BaseOperator(task_id=task_id, dag=dag) + + execution_date = timezone.datetime(2022, 3, 14) + dag_run_id = "test-deleted-tasks-dag-run" + with create_session() as session: + dag_run = dag.create_dagrun( + run_id=dag_run_id, + execution_date=execution_date, + data_interval=(execution_date, execution_date + timedelta(minutes=5)), + state=State.SUCCESS, + external_trigger=True, + session=session, + ) + for ti in dag_run.task_instances: + ti.refresh_from_task(dag.get_task(ti.task_id)) + ti.state = task_to_state[ti.task_id] + ti.start_date = execution_date + ti.end_date = execution_date + timedelta(minutes=5) + session.merge(ti) + + def cleanup_database(): + with create_session() as session: + session.query(DagRun).filter_by(run_id=dag_run_id).delete() + + request.addfinalizer(cleanup_database) + + dag = DAG(DAG_ID, start_date=DEFAULT_DATE) + BaseOperator(task_id="existing-task", dag=dag) + app.dag_bag.bag_dag(dag=dag, root_dag=dag) + + response = admin_client.get( + f'{endpoint}&execution_date={execution_date.isoformat()}', + data={"username": "test", "password": "test"}, + follow_redirects=True, + ) + assert response.status_code == 200
