This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 8f36246f458 Fix import errors total_entries count with multiple DAGs
per file (#67550)
8f36246f458 is described below
commit 8f36246f458d18c2443a3b018bfeb6699fe1aabd
Author: Aditya Patel <[email protected]>
AuthorDate: Mon Jun 8 12:28:31 2026 -0400
Fix import errors total_entries count with multiple DAGs per file (#67550)
* Fix import errors total entries count
* Rerun CI
* Paginate import errors by distinct ids
* Fix import error pagination ordering
* Add import error pagination regression test
---
.../core_api/routes/public/import_error.py | 35 +++++++++-
.../core_api/routes/public/test_import_error.py | 79 ++++++++++++++++++++++
2 files changed, 111 insertions(+), 3 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
index 8f8fcca42b9..2fcc94436b2 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
@@ -22,7 +22,7 @@ from operator import itemgetter
from typing import Annotated
from fastapi import Depends, HTTPException, status
-from sqlalchemy import and_, or_, select
+from sqlalchemy import and_, func, or_, select
from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.models.batch_apis import
IsAuthorizedDagRequest
@@ -31,6 +31,7 @@ from
airflow.api_fastapi.auth.managers.models.resource_details import (
)
from airflow.api_fastapi.common.db.common import (
SessionDep,
+ apply_filters_to_select,
paginated_select,
)
from airflow.api_fastapi.common.parameters import (
@@ -208,14 +209,42 @@ def get_import_errors(
.order_by(ParseImportError.id)
)
- # Paginate the import errors query
- import_errors_select, total_entries = paginated_select(
+ filtered_import_errors_stmt = apply_filters_to_select(
statement=import_errors_stmt,
filters=[filename_pattern, filename_prefix_pattern],
+ )
+ import_error_ids_stmt = (
+ filtered_import_errors_stmt.with_only_columns(
+ ParseImportError.id,
+ ParseImportError.timestamp,
+ ParseImportError.filename,
+ ParseImportError.bundle_name,
+ ParseImportError.stacktrace,
+ )
+ .distinct()
+ .order_by(None)
+ )
+ total_entries =
session.scalar(select(func.count()).select_from(import_error_ids_stmt.subquery()))
or 0
+
+ # Paginate distinct import error IDs first so limit/offset apply to
+ # import error objects, not to the joined Dag rows.
+ paginated_import_error_ids_select, _ = paginated_select(
+ statement=import_error_ids_stmt,
order_by=order_by,
offset=offset,
limit=limit,
session=session,
+ return_total_entries=False,
+ )
+ paginated_import_error_ids = paginated_import_error_ids_select.subquery()
+
+ # Fetch all joined Dag rows for the paginated import error IDs before
+ # grouping, so each returned import error still has the full Dag set.
+ import_errors_select = apply_filters_to_select(
+ statement=filtered_import_errors_stmt.where(
+ ParseImportError.id.in_(select(paginated_import_error_ids.c.id))
+ ),
+ filters=[order_by],
)
import_errors_result: Iterable[tuple[ParseImportError, Iterable]] =
groupby(
session.execute(import_errors_select), itemgetter(0)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
index ef5427acb69..a595d139f92 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
@@ -395,6 +395,85 @@ class TestGetImportErrors:
response = unauthorized_test_client.get("/importErrors")
assert response.status_code == 403
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+ def
test_total_entries_counts_distinct_import_errors_when_file_has_multiple_dags(
+ self,
+ mock_get_auth_manager,
+ test_client,
+ testing_dag_bundle,
+ session,
+ ):
+ dag_models = [
+ DagModel(
+ fileloc=FILENAME1,
+ relative_fileloc=FILENAME1,
+ dag_id=f"dag_id{i}",
+ is_paused=False,
+ bundle_name=BUNDLE_NAME,
+ )
+ for i in range(1, 4)
+ ]
+ session.add_all(dag_models)
+ session.commit()
+
+ readable_dag_ids = {dag_model.dag_id for dag_model in dag_models}
+ set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager,
readable_dag_ids)
+ mock_batch_is_authorized_dag =
set_mock_auth_manager__batch_is_authorized_dag(
+ mock_get_auth_manager, True
+ )
+
+ response = test_client.get("/importErrors",
params={"filename_pattern": FILENAME1, "limit": 1})
+
+ assert response.status_code == 200
+ response_json = response.json()
+ assert response_json["total_entries"] == 1
+ assert [import_error["filename"] for import_error in
response_json["import_errors"]] == [FILENAME1]
+ assert len(mock_batch_is_authorized_dag.call_args.args[0]) == 3
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+ def test_pagination_applies_to_distinct_import_errors_not_joined_dag_rows(
+ self,
+ mock_get_auth_manager,
+ test_client,
+ testing_dag_bundle,
+ session,
+ ):
+ dag_models = [
+ DagModel(
+ fileloc=FILENAME1,
+ relative_fileloc=FILENAME1,
+ dag_id=f"dag_id{i}",
+ is_paused=False,
+ bundle_name=BUNDLE_NAME,
+ )
+ for i in range(1, 4)
+ ]
+ dag_models.append(
+ DagModel(
+ fileloc=FILENAME2,
+ relative_fileloc=FILENAME2,
+ dag_id="dag_id4",
+ is_paused=False,
+ bundle_name=BUNDLE_NAME,
+ )
+ )
+ session.add_all(dag_models)
+ session.commit()
+
+ readable_dag_ids = {dag_model.dag_id for dag_model in dag_models}
+ set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager,
readable_dag_ids)
+ set_mock_auth_manager__batch_is_authorized_dag(mock_get_auth_manager,
True)
+
+ response = test_client.get(
+ "/importErrors",
+ params={"filename_pattern": "test_filename", "limit": 1, "offset":
1, "order_by": "id"},
+ )
+
+ assert response.status_code == 200
+ response_json = response.json()
+ assert response_json["total_entries"] == 2
+ assert [import_error["filename"] for import_error in
response_json["import_errors"]] == [FILENAME2]
+
@pytest.mark.parametrize(
("team", "batch_is_authorized_dag_return_value",
"expected_stack_trace"),
[