pierrejeambrun commented on code in PR #47270:
URL: https://github.com/apache/airflow/pull/47270#discussion_r1995287164
##########
airflow/api_fastapi/core_api/routes/public/import_error.py:
##########
@@ -36,18 +42,38 @@
ImportErrorResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import (
+ AccessView,
+ GetUserDep,
+ requires_access_dag,
+ requires_access_view,
+)
+from airflow.models import DagModel
from airflow.models.errors import ParseImportError
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+REDACTED_STACKTRACE = "REDACTED - you do not have read permission on all DAGs
in the file"
import_error_router = AirflowRouter(tags=["Import Error"],
prefix="/importErrors")
+def get_file_dag_ids(session: Session, filename: str) -> set[str]:
+ return {dag_id for dag_id in
session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == filename))}
+
+
@import_error_router.get(
"/{import_error_id}",
- responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ responses=create_openapi_http_exception_doc([status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]),
+ dependencies=[
+ Depends(requires_access_view(AccessView.IMPORT_ERRORS)),
+ Depends(requires_access_dag("GET")),
Review Comment:
I think we need to remove this
```suggestion
```
Unlike other endpoints this is also check directly in the route
implementation and handled differently:
- can_read_all_dags = auth_manager.is_authorized_dag(method="GET", user=user)
##########
tests/api_fastapi/core_api/routes/public/test_import_error.py:
##########
@@ -129,6 +140,53 @@ def test_get_import_error(
}
assert response.json() == expected_json
+ def test_should_raises_401_unauthenticated(self,
unauthenticated_test_client, setup):
+ import_error_id = setup[FILENAME1].id
+ response =
unauthenticated_test_client.get(f"/public/importErrors/{import_error_id}")
+ assert response.status_code == 401
+
+ def test_should_raises_403_unauthorized(self, unauthorized_test_client,
setup):
+ import_error_id = setup[FILENAME1].id
+ response =
unauthorized_test_client.get(f"/public/importErrors/{import_error_id}")
+ assert response.status_code == 403
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+ def
test_should_raises_403_unauthorized__user_can_not_read_any_dags_in_file(
+ self, mock_get_auth_manager, test_client, setup
+ ):
+ import_error_id = setup[FILENAME1].id
+ mock_is_authorized_dag =
mock_get_auth_manager.return_value.is_authorized_dag
+ mock_is_authorized_dag.return_value = False
+ mock_get_permitted_dag_ids =
mock_get_auth_manager.return_value.get_permitted_dag_ids
+ mock_get_permitted_dag_ids.return_value = set()
+ response = test_client.get(f"/public/importErrors/{import_error_id}")
+ mock_is_authorized_dag.assert_called_once_with(method="GET",
user=mock.ANY)
+ mock_get_permitted_dag_ids.assert_called_once_with(user=mock.ANY)
+ assert response.status_code == 403
+ assert response.json() == {"detail": "You do not have read permission
on any of the DAGs in the file"}
Review Comment:
When tests are big with a lot of line to prepare / mock etc... You can
structure them a little bit like this, or at least leave blank lines, it wil be
easier to read.
That is a nitpick, non blocking.
##########
airflow/api_fastapi/core_api/routes/public/import_error.py:
##########
@@ -36,18 +42,38 @@
ImportErrorResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import (
+ AccessView,
+ GetUserDep,
+ requires_access_dag,
+ requires_access_view,
+)
+from airflow.models import DagModel
from airflow.models.errors import ParseImportError
+if TYPE_CHECKING:
+ from sqlalchemy.orm import Session
+
+REDACTED_STACKTRACE = "REDACTED - you do not have read permission on all DAGs
in the file"
import_error_router = AirflowRouter(tags=["Import Error"],
prefix="/importErrors")
+def get_file_dag_ids(session: Session, filename: str) -> set[str]:
+ return {dag_id for dag_id in
session.scalars(select(DagModel.dag_id).where(DagModel.fileloc == filename))}
+
+
@import_error_router.get(
"/{import_error_id}",
- responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ responses=create_openapi_http_exception_doc([status.HTTP_403_FORBIDDEN,
status.HTTP_404_NOT_FOUND]),
Review Comment:
```suggestion
responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
```
Forbidden is already part of the base router.
##########
airflow/api_fastapi/core_api/routes/public/import_error.py:
##########
@@ -83,16 +131,48 @@ def get_import_errors(
),
],
session: SessionDep,
+ user: GetUserDep,
) -> ImportErrorCollectionResponse:
"""Get all import errors."""
+ statement = select(ParseImportError)
import_errors_select, total_entries = paginated_select(
- statement=select(ParseImportError),
+ statement=statement,
order_by=order_by,
offset=offset,
limit=limit,
session=session,
)
- import_errors = session.scalars(import_errors_select)
+
+ auth_manager = get_auth_manager()
+ can_read_all_dags = auth_manager.is_authorized_dag(method="GET", user=user)
+ if not can_read_all_dags:
+ # if the user doesn't have access to all DAGs, only display errors
from visible DAGs
+ readable_dag_ids = auth_manager.get_permitted_dag_ids(method="GET",
user=user)
+ dagfiles_stmt =
select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(readable_dag_ids))
+ statement =
statement.where(ParseImportError.filename.in_(dagfiles_stmt))
+ import_errors_select, total_entries = paginated_select(
+ statement=statement,
+ order_by=order_by,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
Review Comment:
+1
##########
tests/api_fastapi/core_api/routes/public/test_import_error.py:
##########
@@ -129,6 +140,53 @@ def test_get_import_error(
}
assert response.json() == expected_json
+ def test_should_raises_401_unauthenticated(self,
unauthenticated_test_client, setup):
+ import_error_id = setup[FILENAME1].id
+ response =
unauthenticated_test_client.get(f"/public/importErrors/{import_error_id}")
+ assert response.status_code == 401
+
+ def test_should_raises_403_unauthorized(self, unauthorized_test_client,
setup):
+ import_error_id = setup[FILENAME1].id
+ response =
unauthorized_test_client.get(f"/public/importErrors/{import_error_id}")
+ assert response.status_code == 403
+
+
@mock.patch("airflow.api_fastapi.core_api.routes.public.import_error.get_auth_manager")
+ def
test_should_raises_403_unauthorized__user_can_not_read_any_dags_in_file(
+ self, mock_get_auth_manager, test_client, setup
+ ):
+ import_error_id = setup[FILENAME1].id
+ mock_is_authorized_dag =
mock_get_auth_manager.return_value.is_authorized_dag
+ mock_is_authorized_dag.return_value = False
+ mock_get_permitted_dag_ids =
mock_get_auth_manager.return_value.get_permitted_dag_ids
+ mock_get_permitted_dag_ids.return_value = set()
+ response = test_client.get(f"/public/importErrors/{import_error_id}")
+ mock_is_authorized_dag.assert_called_once_with(method="GET",
user=mock.ANY)
+ mock_get_permitted_dag_ids.assert_called_once_with(user=mock.ANY)
+ assert response.status_code == 403
+ assert response.json() == {"detail": "You do not have read permission
on any of the DAGs in the file"}
Review Comment:
```suggestion
# Arrange
import_error_id = setup[FILENAME1].id
mock_is_authorized_dag =
mock_get_auth_manager.return_value.is_authorized_dag
mock_is_authorized_dag.return_value = False
mock_get_permitted_dag_ids =
mock_get_auth_manager.return_value.get_permitted_dag_ids
mock_get_permitted_dag_ids.return_value = set()
# Act
response = test_client.get(f"/public/importErrors/{import_error_id}")
# Assert
mock_is_authorized_dag.assert_called_once_with(method="GET",
user=mock.ANY)
mock_get_permitted_dag_ids.assert_called_once_with(user=mock.ANY)
assert response.status_code == 403
assert response.json() == {"detail": "You do not have read
permission on any of the DAGs in the file"}
```
##########
airflow/api_fastapi/core_api/routes/public/import_error.py:
##########
@@ -56,12 +82,34 @@ def get_import_error(
status.HTTP_404_NOT_FOUND,
f"The ImportError with import_error_id: `{import_error_id}` was
not found",
)
+ session.expunge(error)
+
+ auth_manager = get_auth_manager()
+ can_read_all_dags = auth_manager.is_authorized_dag(method="GET", user=user)
+ if not can_read_all_dags:
+ readable_dag_ids = auth_manager.get_permitted_dag_ids(user=user)
+ file_dag_ids = get_file_dag_ids(session, error.filename)
+
+ # Can the user read any DAGs in the file?
+ if not readable_dag_ids.intersection(file_dag_ids):
+ raise HTTPException(
+ status.HTTP_403_FORBIDDEN,
+ "You do not have read permission on any of the DAGs in the
file",
+ )
+
+ # Check if user has read access to all the DAGs defined in the file
+ if not file_dag_ids.issubset(readable_dag_ids):
+ error.stacktrace = REDACTED_STACKTRACE
return error
@import_error_router.get(
"",
+ dependencies=[
+ Depends(requires_access_view(AccessView.IMPORT_ERRORS)),
+ Depends(requires_access_dag("GET")),
Review Comment:
```suggestion
```
Same, you can access the endpoint if you don't have access to all dags.
That's checked manually inside the body of the route.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]