This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f36246f458 Fix import errors total_entries count with multiple DAGs 
per file (#67550)
8f36246f458 is described below

commit 8f36246f458d18c2443a3b018bfeb6699fe1aabd
Author: Aditya Patel <[email protected]>
AuthorDate: Mon Jun 8 12:28:31 2026 -0400

    Fix import errors total_entries count with multiple DAGs per file (#67550)
    
    * Fix import errors total entries count
    
    * Rerun CI
    
    * Paginate import errors by distinct ids
    
    * Fix import error pagination ordering
    
    * Add import error pagination regression test
---
 .../core_api/routes/public/import_error.py         | 35 +++++++++-
 .../core_api/routes/public/test_import_error.py    | 79 ++++++++++++++++++++++
 2 files changed, 111 insertions(+), 3 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
index 8f8fcca42b9..2fcc94436b2 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
@@ -22,7 +22,7 @@ from operator import itemgetter
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, status
-from sqlalchemy import and_, or_, select
+from sqlalchemy import and_, func, or_, select
 
 from airflow.api_fastapi.app import get_auth_manager
 from airflow.api_fastapi.auth.managers.models.batch_apis import 
IsAuthorizedDagRequest
@@ -31,6 +31,7 @@ from 
airflow.api_fastapi.auth.managers.models.resource_details import (
 )
 from airflow.api_fastapi.common.db.common import (
     SessionDep,
+    apply_filters_to_select,
     paginated_select,
 )
 from airflow.api_fastapi.common.parameters import (
@@ -208,14 +209,42 @@ def get_import_errors(
         .order_by(ParseImportError.id)
     )
 
-    # Paginate the import errors query
-    import_errors_select, total_entries = paginated_select(
+    filtered_import_errors_stmt = apply_filters_to_select(
         statement=import_errors_stmt,
         filters=[filename_pattern, filename_prefix_pattern],
+    )
+    import_error_ids_stmt = (
+        filtered_import_errors_stmt.with_only_columns(
+            ParseImportError.id,
+            ParseImportError.timestamp,
+            ParseImportError.filename,
+            ParseImportError.bundle_name,
+            ParseImportError.stacktrace,
+        )
+        .distinct()
+        .order_by(None)
+    )
+    total_entries = 
session.scalar(select(func.count()).select_from(import_error_ids_stmt.subquery()))
 or 0
+
+    # Paginate distinct import error IDs first so limit/offset apply to
+    # import error objects, not to the joined Dag rows.
+    paginated_import_error_ids_select, _ = paginated_select(
+        statement=import_error_ids_stmt,
         order_by=order_by,
         offset=offset,
         limit=limit,
         session=session,
+        return_total_entries=False,
+    )
+    paginated_import_error_ids = paginated_import_error_ids_select.subquery()
+
+    # Fetch all joined Dag rows for the paginated import error IDs before
+    # grouping, so each returned import error still has the full Dag set.
+    import_errors_select = apply_filters_to_select(
+        statement=filtered_import_errors_stmt.where(
+            ParseImportError.id.in_(select(paginated_import_error_ids.c.id))
+        ),
+        filters=[order_by],
     )
     import_errors_result: Iterable[tuple[ParseImportError, Iterable]] = 
groupby(
         session.execute(import_errors_select), itemgetter(0)
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
index ef5427acb69..a595d139f92 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
@@ -395,6 +395,85 @@ class TestGetImportErrors:
         response = unauthorized_test_client.get("/importErrors")
         assert response.status_code == 403
 
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+    def 
test_total_entries_counts_distinct_import_errors_when_file_has_multiple_dags(
+        self,
+        mock_get_auth_manager,
+        test_client,
+        testing_dag_bundle,
+        session,
+    ):
+        dag_models = [
+            DagModel(
+                fileloc=FILENAME1,
+                relative_fileloc=FILENAME1,
+                dag_id=f"dag_id{i}",
+                is_paused=False,
+                bundle_name=BUNDLE_NAME,
+            )
+            for i in range(1, 4)
+        ]
+        session.add_all(dag_models)
+        session.commit()
+
+        readable_dag_ids = {dag_model.dag_id for dag_model in dag_models}
+        set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager, 
readable_dag_ids)
+        mock_batch_is_authorized_dag = 
set_mock_auth_manager__batch_is_authorized_dag(
+            mock_get_auth_manager, True
+        )
+
+        response = test_client.get("/importErrors", 
params={"filename_pattern": FILENAME1, "limit": 1})
+
+        assert response.status_code == 200
+        response_json = response.json()
+        assert response_json["total_entries"] == 1
+        assert [import_error["filename"] for import_error in 
response_json["import_errors"]] == [FILENAME1]
+        assert len(mock_batch_is_authorized_dag.call_args.args[0]) == 3
+
+    
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+    def test_pagination_applies_to_distinct_import_errors_not_joined_dag_rows(
+        self,
+        mock_get_auth_manager,
+        test_client,
+        testing_dag_bundle,
+        session,
+    ):
+        dag_models = [
+            DagModel(
+                fileloc=FILENAME1,
+                relative_fileloc=FILENAME1,
+                dag_id=f"dag_id{i}",
+                is_paused=False,
+                bundle_name=BUNDLE_NAME,
+            )
+            for i in range(1, 4)
+        ]
+        dag_models.append(
+            DagModel(
+                fileloc=FILENAME2,
+                relative_fileloc=FILENAME2,
+                dag_id="dag_id4",
+                is_paused=False,
+                bundle_name=BUNDLE_NAME,
+            )
+        )
+        session.add_all(dag_models)
+        session.commit()
+
+        readable_dag_ids = {dag_model.dag_id for dag_model in dag_models}
+        set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager, 
readable_dag_ids)
+        set_mock_auth_manager__batch_is_authorized_dag(mock_get_auth_manager, 
True)
+
+        response = test_client.get(
+            "/importErrors",
+            params={"filename_pattern": "test_filename", "limit": 1, "offset": 
1, "order_by": "id"},
+        )
+
+        assert response.status_code == 200
+        response_json = response.json()
+        assert response_json["total_entries"] == 2
+        assert [import_error["filename"] for import_error in 
response_json["import_errors"]] == [FILENAME2]
+
     @pytest.mark.parametrize(
         ("team", "batch_is_authorized_dag_return_value", 
"expected_stack_trace"),
         [

Reply via email to