This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new cf0349247b6 Skip populating ti if task is deleted (#47526)
cf0349247b6 is described below
commit cf0349247b6b0fc3f8ab1bfdfe4b24ab6b7f42ab
Author: Bugra Ozturk <[email protected]>
AuthorDate: Mon Mar 10 16:50:40 2025 +0100
Skip populating ti if task is deleted (#47526)
---
airflow/api_fastapi/core_api/routes/ui/grid.py | 7 +-
tests/api_fastapi/core_api/routes/ui/test_grid.py | 135 +++++++++++++++++++++-
2 files changed, 136 insertions(+), 6 deletions(-)
diff --git a/airflow/api_fastapi/core_api/routes/ui/grid.py
b/airflow/api_fastapi/core_api/routes/ui/grid.py
index 10705bb7c93..c145e334625 100644
--- a/airflow/api_fastapi/core_api/routes/ui/grid.py
+++ b/airflow/api_fastapi/core_api/routes/ui/grid.py
@@ -145,9 +145,12 @@ def grid_data(
parent_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
all_tis: dict[tuple[str, str], list] = collections.defaultdict(list)
for ti in task_instances:
- # Skip the Task Instances if upstream/downstream filtering is applied
- if task_node_map_exclude and ti.task_id not in
task_node_map_exclude.keys():
+ # Skip the Task Instances if upstream/downstream filtering is applied
or if task is deleted
+ if (
+ task_node_map_exclude and ti.task_id not in
task_node_map_exclude.keys()
+ ) or ti.task_id not in task_node_map.keys():
continue
+
# Populate the Grouped Task Instances (All Task Instances except the
Parent Task Instances)
if ti.task_id in get_child_task_map(
parent_task_id=task_node_map[ti.task_id]["parent_id"],
task_node_map=task_node_map
diff --git a/tests/api_fastapi/core_api/routes/ui/test_grid.py
b/tests/api_fastapi/core_api/routes/ui/test_grid.py
index 87e4697cc3a..d9705324f54 100644
--- a/tests/api_fastapi/core_api/routes/ui/test_grid.py
+++ b/tests/api_fastapi/core_api/routes/ui/test_grid.py
@@ -38,8 +38,11 @@ pytestmark = pytest.mark.db_test
DAG_ID = "test_dag"
DAG_ID_2 = "test_dag_2"
+DAG_ID_3 = "test_dag_3"
TASK_ID = "task"
TASK_ID_2 = "task2"
+TASK_ID_3 = "task3"
+TASK_ID_4 = "task4"
SUB_TASK_ID = "subtask"
MAPPED_TASK_ID = "mapped_task"
TASK_GROUP_ID = "task_group"
@@ -443,7 +446,8 @@ def setup(dag_maker, session=None):
clear_db_runs()
clear_db_dags()
clear_db_serialized_dags()
-
+ global DAG_WITH_DELETED_TASK
+ # DAG 1
with dag_maker(dag_id=DAG_ID, serialized=True, session=session) as dag:
EmptyOperator(task_id=TASK_ID)
@@ -459,7 +463,6 @@ def setup(dag_maker, session=None):
triggered_by_kwargs = {"triggered_by": DagRunTriggeredByType.TEST}
logical_date = timezone.datetime(2024, 11, 30)
-
data_interval =
dag.timetable.infer_manual_data_interval(run_after=logical_date)
run_1 = dag_maker.create_dagrun(
run_id="run_1",
@@ -492,11 +495,45 @@ def setup(dag_maker, session=None):
ti.start_date = pendulum.DateTime(2024, 12, 30, 2, 3, 4,
tzinfo=pendulum.UTC)
ti.end_date = None
- session.flush()
-
+ # DAG 2
with dag_maker(dag_id=DAG_ID_2, serialized=True, session=session):
EmptyOperator(task_id=TASK_ID_2)
+ # DAG 3 for testing removed task
+ with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session) as dag_3:
+ EmptyOperator(task_id=TASK_ID_3)
+ EmptyOperator(task_id=TASK_ID_4)
+
+ logical_date = timezone.datetime(2024, 11, 30)
+ data_interval =
dag_3.timetable.infer_manual_data_interval(run_after=logical_date)
+ run_3 = dag_maker.create_dagrun(
+ run_id="run_1",
+ state=DagRunState.SUCCESS,
+ run_type=DagRunType.SCHEDULED,
+ logical_date=logical_date,
+ data_interval=data_interval,
+ **triggered_by_kwargs,
+ )
+ run_4 = dag_maker.create_dagrun(
+ run_id="run_2",
+ run_type=DagRunType.MANUAL,
+ state=DagRunState.FAILED,
+ logical_date=logical_date + timedelta(days=1),
+ data_interval=data_interval,
+ **triggered_by_kwargs,
+ )
+ for ti in run_3.task_instances:
+ ti.state = TaskInstanceState.SUCCESS
+ for ti in sorted(run_4.task_instances, key=lambda ti: (ti.task_id,
ti.map_index)):
+ if ti.task_id == TASK_ID_3 or ti.task_id == TASK_ID_4:
+ ti.state = TaskInstanceState.SUCCESS
+
+ session.flush()
+ # Serialize DAG with only one task
+ with dag_maker(dag_id=DAG_ID_3, serialized=True, session=session):
+ EmptyOperator(task_id=TASK_ID_3)
+ session.flush()
+
@pytest.fixture(autouse=True)
def _clean():
@@ -975,3 +1012,93 @@ class TestGetGridDataEndpoint:
response = test_client.get(f"/ui/grid/{DAG_ID_2}")
assert response.status_code == 200
assert response.json() == {"dag_runs": []}
+
+ def test_should_response_200_with_deleted_task(self, test_client):
+ response = test_client.get(f"/ui/grid/{DAG_ID_3}")
+ assert response.status_code == 200
+ assert response.json() == {
+ "dag_runs": [
+ {
+ "dag_run_id": "run_1",
+ "data_interval_end": "2024-11-30T00:00:00Z",
+ "data_interval_start": "2024-11-29T00:00:00Z",
+ "end_date": None,
+ "logical_date": "2024-11-30T00:00:00Z",
+ "note": None,
+ "queued_at": "2024-12-31T00:00:00Z",
+ "run_after": "2024-11-30T00:00:00Z",
+ "run_type": "scheduled",
+ "start_date": None,
+ "state": "queued",
+ "task_instances": [
+ {
+ "child_states": {
+ "deferred": 0,
+ "failed": 0,
+ "no_status": 1,
+ "queued": 0,
+ "removed": 0,
+ "restarting": 0,
+ "running": 0,
+ "scheduled": 0,
+ "skipped": 0,
+ "success": 0,
+ "up_for_reschedule": 0,
+ "up_for_retry": 0,
+ "upstream_failed": 0,
+ },
+ "end_date": None,
+ "note": None,
+ "queued_dttm": None,
+ "start_date": None,
+ "state": None,
+ "task_count": 1,
+ "task_id": "task3",
+ "try_number": 0,
+ },
+ ],
+ "version_number": 1,
+ },
+ {
+ "dag_run_id": "run_2",
+ "data_interval_end": "2024-11-30T00:00:00Z",
+ "data_interval_start": "2024-11-29T00:00:00Z",
+ "end_date": None,
+ "logical_date": "2024-12-01T00:00:00Z",
+ "note": None,
+ "queued_at": "2024-12-31T00:00:00Z",
+ "run_after": "2024-11-30T00:00:00Z",
+ "run_type": "manual",
+ "start_date": None,
+ "state": "queued",
+ "task_instances": [
+ {
+ "child_states": {
+ "deferred": 0,
+ "failed": 0,
+ "no_status": 1,
+ "queued": 0,
+ "removed": 0,
+ "restarting": 0,
+ "running": 0,
+ "scheduled": 0,
+ "skipped": 0,
+ "success": 0,
+ "up_for_reschedule": 0,
+ "up_for_retry": 0,
+ "upstream_failed": 0,
+ },
+ "end_date": None,
+ "note": None,
+ "queued_dttm": None,
+ "start_date": None,
+ "state": None,
+ "task_count": 1,
+ "task_id": "task3",
+ "try_number": 0,
+ },
+ ],
+ "version_number": 1,
+ },
+ ],
+ }