This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new cf0349247b6 Skip populating ti if task is deleted (#47526)
cf0349247b6 is described below

commit cf0349247b6b0fc3f8ab1bfdfe4b24ab6b7f42ab
Author: Bugra Ozturk <[email protected]>
AuthorDate: Mon Mar 10 16:50:40 2025 +0100

    Skip populating ti if task is deleted (#47526)
---
 airflow/api_fastapi/core_api/routes/ui/grid.py    |   7 +-
 tests/api_fastapi/core_api/routes/ui/test_grid.py | 135 +++++++++++++++++++++-
 2 files changed, 136 insertions(+), 6 deletions(-)

diff --git a/airflow/api_fastapi/core_api/routes/ui/grid.py 
b/airflow/api_fastapi/core_api/routes/ui/grid.py
index 10705bb7c93..c145e334625 100644
--- a/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -145,9 +145,12 @@ def grid_data(
     parent_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
     all_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
     for ti in task_instances:
-        # Skip the Task Instances if upstream/downstream filtering is applied
-        if task_node_map_exclude and ti.task_id not in 
task_node_map_exclude.keys():
+        # Skip the Task Instances if upstream/downstream filtering is applied 
or if task is deleted
+        if (
+            task_node_map_exclude and ti.task_id not in 
task_node_map_exclude.keys()
+        ) or ti.task_id not in task_node_map.keys():
             continue
+
         # Populate the Grouped Task Instances (All Task Instances except the 
Parent Task Instances)
         if ti.task_id in get_child_task_map(
             parent_task_id=task_node_map[ti.task_id]["parent_id"], 
task_node_map=task_node_map
diff --git a/tests/api_fastapi/core_api/routes/ui/test_grid.py 
b/tests/api_fastapi/core_api/routes/ui/test_grid.py
index 87e4697cc3a..d9705324f54 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_grid.py
@@ -38,8 +38,11 @@ pytestmark = pytest.mark.db_test
 
 DAG_ID = "test_dag"
 DAG_ID_2 = "test_dag_2"
+DAG_ID_3 = "test_dag_3"
 TASK_ID = "task"
 TASK_ID_2 = "task2"
+TASK_ID_3 = "task3"
+TASK_ID_4 = "task4"
 SUB_TASK_ID = "subtask"
 MAPPED_TASK_ID = "mapped_task"
 TASK_GROUP_ID = "task_group"
@@ -443,7 +446,8 @@ def setup(dag_maker, session=None):
     clear_db_runs()
     clear_db_dags()
     clear_db_serialized_dags()
-
+    global DAG_WITH_DELETED_TASK
+    # DAG 1
     with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
         EmptyOperator(task_id=TASK_ID)
 
@@ -459,7 +463,6 @@ def setup(dag_maker, session=None):
 
     triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
     logical_date = timezone.datetime(2024, 11, 30)
-
     data_interval = 
dag.timetable.infer_manual_data_interval(run_after=logical_date)
     run_1 = dag_maker.create_dagrun(
         run_id="run_1",
@@ -492,11 +495,45 @@ def setup(dag_maker, session=None):
                 ti.start_date = pendulum.DateTime(2024, 12, 30, 2, 3, 4, 
tzinfo=pendulum.UTC)
                 ti.end_date = None
 
-    session.flush()
-
+    # DAG 2
     with dag_maker(dag_id=DAG_ID_2, serialized=True, session=session):
         EmptyOperator(task_id=TASK_ID_2)
 
+    # DAG 3 for testing removed task
+    with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session) as dag_3:
+        EmptyOperator(task_id=TASK_ID_3)
+        EmptyOperator(task_id=TASK_ID_4)
+
+    logical_date = timezone.datetime(2024, 11, 30)
+    data_interval = 
dag_3.timetable.infer_manual_data_interval(run_after=logical_date)
+    run_3 = dag_maker.create_dagrun(
+        run_id="run_1",
+        state=DagRunState.SUCCESS,
+        run_type=DagRunType.SCHEDULED,
+        logical_date=logical_date,
+        data_interval=data_interval,
+        **triggered_by_kwargs,
+    )
+    run_4 = dag_maker.create_dagrun(
+        run_id="run_2",
+        run_type=DagRunType.MANUAL,
+        state=DagRunState.FAILED,
+        logical_date=logical_date + timedelta(days=1),
+        data_interval=data_interval,
+        **triggered_by_kwargs,
+    )
+    for ti in run_3.task_instances:
+        ti.state = TaskInstanceState.SUCCESS
+    for ti in sorted(run_4.task_instances, key=lambda ti: (ti.task_id, 
ti.map_index)):
+        if ti.task_id == TASK_ID_3 or ti.task_id == TASK_ID_4:
+            ti.state = TaskInstanceState.SUCCESS
+
+    session.flush()
+    # Serialize DAG with only one task
+    with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session):
+        EmptyOperator(task_id=TASK_ID_3)
+    session.flush()
+
 
 @pytest.fixture(autouse=True)
 def _clean():
@@ -975,3 +1012,93 @@ class TestGetGridDataEndpoint:
         response = test_client.get(f"/ui/grid/{DAG_ID_2}")
         assert response.status_code == 200
         assert response.json() == {"dag_runs": []}
+
+    def test_should_response_200_with_deleted_task(self, test_client):
+        response = test_client.get(f"/ui/grid/{DAG_ID_3}")
+        assert response.status_code == 200
+        assert response.json() == {
+            "dag_runs": [
+                {
+                    "dag_run_id": "run_1",
+                    "data_interval_end": "2024-11-30T00:00:00Z",
+                    "data_interval_start": "2024-11-29T00:00:00Z",
+                    "end_date": None,
+                    "logical_date": "2024-11-30T00:00:00Z",
+                    "note": None,
+                    "queued_at": "2024-12-31T00:00:00Z",
+                    "run_after": "2024-11-30T00:00:00Z",
+                    "run_type": "scheduled",
+                    "start_date": None,
+                    "state": "queued",
+                    "task_instances": [
+                        {
+                            "child_states": {
+                                "deferred": 0,
+                                "failed": 0,
+                                "no_status": 1,
+                                "queued": 0,
+                                "removed": 0,
+                                "restarting": 0,
+                                "running": 0,
+                                "scheduled": 0,
+                                "skipped": 0,
+                                "success": 0,
+                                "up_for_reschedule": 0,
+                                "up_for_retry": 0,
+                                "upstream_failed": 0,
+                            },
+                            "end_date": None,
+                            "note": None,
+                            "queued_dttm": None,
+                            "start_date": None,
+                            "state": None,
+                            "task_count": 1,
+                            "task_id": "task3",
+                            "try_number": 0,
+                        },
+                    ],
+                    "version_number": 1,
+                },
+                {
+                    "dag_run_id": "run_2",
+                    "data_interval_end": "2024-11-30T00:00:00Z",
+                    "data_interval_start": "2024-11-29T00:00:00Z",
+                    "end_date": None,
+                    "logical_date": "2024-12-01T00:00:00Z",
+                    "note": None,
+                    "queued_at": "2024-12-31T00:00:00Z",
+                    "run_after": "2024-11-30T00:00:00Z",
+                    "run_type": "manual",
+                    "start_date": None,
+                    "state": "queued",
+                    "task_instances": [
+                        {
+                            "child_states": {
+                                "deferred": 0,
+                                "failed": 0,
+                                "no_status": 1,
+                                "queued": 0,
+                                "removed": 0,
+                                "restarting": 0,
+                                "running": 0,
+                                "scheduled": 0,
+                                "skipped": 0,
+                                "success": 0,
+                                "up_for_reschedule": 0,
+                                "up_for_retry": 0,
+                                "upstream_failed": 0,
+                            },
+                            "end_date": None,
+                            "note": None,
+                            "queued_dttm": None,
+                            "start_date": None,
+                            "state": None,
+                            "task_count": 1,
+                            "task_id": "task3",
+                            "try_number": 0,
+                        },
+                    ],
+                    "version_number": 1,
+                },
+            ],
+        }

Reply via email to