This is an automated email from the ASF dual-hosted git repository.
vincbeck pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new dd1a86593f2 Fix import errors not showing on UI (#61163) (#61213)
dd1a86593f2 is described below
commit dd1a86593f2bd38b50b9f2a0a91529d16f89d0c4
Author: Rahul Vats <[email protected]>
AuthorDate: Thu Jan 29 23:10:09 2026 +0530
Fix import errors not showing on UI (#61163) (#61213)
Import error stopped showing on UI after #60801 which removed the early
bypass for users with access to all DAGs.
When a DAG file has an import error, the DAG doesn't exist in DagModel
(because it failed to import). The inner join with DagModel filters out these
import errors because there's no matching row.
This was always broken for users without "read all" permissions but was
masked by the early return path.
---
.../core_api/routes/public/import_error.py | 35 ++++++++++++---
.../core_api/routes/public/test_import_error.py | 51 +++++++++++++++++-----
2 files changed, 71 insertions(+), 15 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
index 8485bd66a46..b4e3d8a514b 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/import_error.py
@@ -22,7 +22,7 @@ from operator import itemgetter
from typing import Annotated
from fastapi import Depends, HTTPException, status
-from sqlalchemy import and_, select
+from sqlalchemy import and_, or_, select
from airflow.api_fastapi.app import get_auth_manager
from airflow.api_fastapi.auth.managers.models.batch_apis import
IsAuthorizedDagRequest
@@ -84,6 +84,11 @@ def get_import_error(
file_dag_ids = set(
session.scalars(select(DagModel.dag_id).where(DagModel.fileloc ==
error.filename)).all()
)
+
+ # No DAGs in the file (failed to parse), nothing to check permissions
against
+ if not file_dag_ids:
+ return error
+
# Can the user read any DAGs in the file?
if not readable_dag_ids.intersection(file_dag_ids):
raise HTTPException(
@@ -129,7 +134,11 @@ def get_import_errors(
"""Get all import errors."""
auth_manager = get_auth_manager()
readable_dag_ids = auth_manager.get_authorized_dag_ids(method="GET",
user=user)
- # Build a cte that fetches dag_ids for each file location
+
+ # Subquery for files that have any DAGs
+ files_with_any_dags =
select(DagModel.relative_fileloc).distinct().subquery()
+
+ # CTE for DAGs the user can read
visible_files_cte = (
select(DagModel.relative_fileloc, DagModel.dag_id,
DagModel.bundle_name)
.where(DagModel.dag_id.in_(readable_dag_ids))
@@ -140,13 +149,23 @@ def get_import_errors(
# Each returned row will be a tuple: (ParseImportError, dag_id)
import_errors_stmt = (
select(ParseImportError, visible_files_cte.c.dag_id)
- .join(
+ .outerjoin(
+ files_with_any_dags,
+ ParseImportError.filename ==
files_with_any_dags.c.relative_fileloc,
+ )
+ .outerjoin(
visible_files_cte,
and_(
ParseImportError.filename ==
visible_files_cte.c.relative_fileloc,
ParseImportError.bundle_name ==
visible_files_cte.c.bundle_name,
),
)
+ .where(
+ or_(
+ files_with_any_dags.c.relative_fileloc.is_(None),
+ visible_files_cte.c.dag_id.isnot(None),
+ )
+ )
.order_by(ParseImportError.id)
)
@@ -164,14 +183,20 @@ def get_import_errors(
)
import_errors = []
- for import_error, file_dag_ids in import_errors_result:
+ for import_error, file_dag_ids_iter in import_errors_result:
+ dag_ids = [dag_id for _, dag_id in file_dag_ids_iter if dag_id is not
None]
+
+ if not dag_ids:
+ import_errors.append(import_error)
+ continue
+
# Check if user has read access to all the DAGs defined in the file
requests: Sequence[IsAuthorizedDagRequest] = [
{
"method": "GET",
"details": DagDetails(id=dag_id),
}
- for dag_id in file_dag_ids
+ for dag_id in dag_ids
]
if not auth_manager.batch_is_authorized_dag(requests, user=user):
session.expunge(import_error)
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
index 7e72e1401da..4d8e9aae1d6 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_import_error.py
@@ -235,6 +235,7 @@ class TestGetImportError:
response =
unauthorized_test_client.get(f"/importErrors/{import_error_id}")
assert response.status_code == 403
+ @pytest.mark.usefixtures("not_permitted_dag_model")
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
def
test_should_raises_403_unauthorized__user_can_not_read_any_dags_in_file(
self, mock_get_auth_manager, test_client, import_errors
@@ -272,6 +273,23 @@ class TestGetImportError:
"bundle_name": BUNDLE_NAME,
}
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+ def test_get_import_error__no_dag_in_dagmodel(self, mock_get_auth_manager,
test_client, import_errors):
+ """Test import error is returned when no DAG exists in DagModel."""
+ import_error_id = import_errors[0].id
+ set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager,
set())
+
+ response = test_client.get(f"/importErrors/{import_error_id}")
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "import_error_id": import_error_id,
+ "timestamp": from_datetime_to_zulu_without_ms(TIMESTAMP1),
+ "filename": FILENAME1,
+ "stack_trace": STACKTRACE1,
+ "bundle_name": BUNDLE_NAME,
+ }
+
class TestGetImportErrors:
@pytest.mark.parametrize(
@@ -395,7 +413,6 @@ class TestGetImportErrors:
),
],
)
- @pytest.mark.usefixtures("permitted_dag_model")
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
def test_user_can_not_read_all_dags_in_file(
self,
@@ -403,18 +420,17 @@ class TestGetImportErrors:
test_client,
batch_is_authorized_dag_return_value,
expected_stack_trace,
- permitted_dag_model,
+ permitted_dag_model_all,
import_errors,
):
mock_get_authorized_dag_ids =
set_mock_auth_manager__get_authorized_dag_ids(
- mock_get_auth_manager, {permitted_dag_model.dag_id}
+ mock_get_auth_manager, {"dag_id1"}
)
set_mock_auth_manager__batch_is_authorized_dag(
mock_get_auth_manager, batch_is_authorized_dag_return_value
)
# Act
- with assert_queries_count(3):
- response = test_client.get("/importErrors")
+ response = test_client.get("/importErrors")
# Assert
mock_get_authorized_dag_ids.assert_called_once_with(method="GET",
user=mock.ANY)
assert response.status_code == 200
@@ -432,14 +448,13 @@ class TestGetImportErrors:
],
}
- @pytest.mark.usefixtures("permitted_dag_model")
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
def test_bundle_name_join_condition_for_import_errors(
- self, mock_get_auth_manager, test_client, permitted_dag_model,
import_errors, session
+ self, mock_get_auth_manager, test_client, permitted_dag_model_all,
import_errors, session
):
"""Test that the bundle_name join condition works correctly."""
mock_get_authorized_dag_ids =
set_mock_auth_manager__get_authorized_dag_ids(
- mock_get_auth_manager, {permitted_dag_model.dag_id}
+ mock_get_auth_manager, {"dag_id1"}
)
set_mock_auth_manager__batch_is_authorized_dag(mock_get_auth_manager,
True)
@@ -456,10 +471,11 @@ class TestGetImportErrors:
assert response_json["import_errors"][0]["filename"] == FILENAME1
# Now test that removing the bundle_name from the DagModel causes the
import error to not be returned
- permitted_dag_model.bundle_name = "another_bundle_name"
+ dag_model1 = session.get(DagModel, "dag_id1")
session.add(DagBundleModel(name="another_bundle_name"))
session.flush()
- session.merge(permitted_dag_model)
+ dag_model1.bundle_name = "another_bundle_name"
+ session.merge(dag_model1)
session.commit()
response2 = test_client.get("/importErrors")
@@ -469,3 +485,18 @@ class TestGetImportErrors:
response_json2 = response2.json()
assert response_json2["total_entries"] == 0
assert response_json2["import_errors"] == []
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+ def test_get_import_errors__no_dag_in_dagmodel(self,
mock_get_auth_manager, test_client, import_errors):
+ """Test import errors are returned when no DAG exists in DagModel."""
+ set_mock_auth_manager__get_authorized_dag_ids(mock_get_auth_manager,
set())
+
+ response = test_client.get("/importErrors")
+
+ assert response.status_code == 200
+ response_json = response.json()
+ assert response_json["total_entries"] == 3
+ filenames = [error["filename"] for error in
response_json["import_errors"]]
+ assert FILENAME1 in filenames
+ assert FILENAME2 in filenames
+ assert FILENAME3 in filenames