Copilot commented on code in PR #63880:
URL: https://github.com/apache/airflow/pull/63880#discussion_r3025349239
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/connections.py:
##########
@@ -31,9 +32,30 @@
async def has_connection_access(
connection_id: str = Path(),
token=CurrentTIToken,
+ session: AsyncSessionDep = Depends(),
Review Comment:
`session: AsyncSessionDep = Depends()` doesn’t match the established usage
of `AsyncSessionDep` in this repo (it’s typically passed as `session:
AsyncSessionDep` without an extra `Depends()`, e.g.
`execution_api/security.py:get_team_name_dep`). Recommend removing `=
Depends()` to ensure FastAPI uses the dependency alias as intended and to avoid
potential double-injection issues.
```suggestion
session: AsyncSessionDep,
```
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/variables.py:
##########
@@ -34,10 +35,31 @@ async def has_variable_access(
request: Request,
variable_key: str = Path(),
token=CurrentTIToken,
+ session: AsyncSessionDep = Depends(),
Review Comment:
`session: AsyncSessionDep = Depends()` is likely incorrect for this
codebase: `AsyncSessionDep` is already a dependency alias, and other functions
accept it as a plain parameter (e.g.
`execution_api/security.py:get_team_name_dep(session: AsyncSessionDep, ...)`).
Suggest changing the signature to `session: AsyncSessionDep` (no `Depends()`)
to avoid mismatched dependency resolution.
```suggestion
session: AsyncSessionDep,
```
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py:
##########
@@ -131,7 +103,64 @@ def test_xcom_access_denied(self, client, caplog):
"reason": "access_denied",
}
}
- assert any(msg.startswith("Checking read XCom access") for msg in
caplog.messages)
+
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_xcom_get_access_allowed(self, client, session,
create_task_instance):
+ ti = create_task_instance()
+ x = XComModel(
+ key="xcom_1",
+ value="value1",
+ dag_run_id=ti.dag_run.id,
+ run_id=ti.run_id,
+ task_id=ti.task_id,
+ dag_id=ti.dag_id,
+ )
+ session.add(x)
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
Review Comment:
Fetching the execution sub-app via `client.app.routes[-1]` is brittle (route
ordering can change). Since `tests/unit/api_fastapi/execution_api/conftest.py`
already provides an `exec_app` fixture (and a `_get_execution_api_app` helper),
prefer using that to set `dependency_overrides[_jwt_bearer]` in this test.
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_variables.py:
##########
@@ -127,7 +104,32 @@ def test_variable_get_access_denied(self, client, caplog):
}
}
- assert any(msg.startswith("Checking read access for task instance")
for msg in caplog.messages)
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_variable_get_access_allowed(self, client, session,
create_task_instance):
+ Variable.set(key="key_auth", value="value_auth", session=session)
+ ti = create_task_instance()
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
Review Comment:
These tests locate the execution sub-app via `client.app.routes[-1]`, which
is brittle (route ordering can change). The test suite already provides an
`exec_app` fixture in `tests/unit/api_fastapi/execution_api/conftest.py` that
reliably returns the mounted `/execution` app. Prefer using that fixture (or
the `_get_execution_api_app` helper) when setting `dependency_overrides`.
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py:
##########
@@ -517,7 +546,54 @@ def test_xcom_access_denied(self, client, caplog):
"reason": "access_denied",
}
}
- assert any(msg.startswith("Checking write XCom access") for msg in
caplog.messages)
+
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_xcom_set_access_allowed(self, client, session,
create_task_instance):
+ ti = create_task_instance()
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
+
+ response =
client.post(f"/execution/xcoms/{ti.dag_id}/{ti.run_id}/{ti.task_id}/xcom_1",
json='"value1"')
+
+ assert response.status_code == 201
+
+ exec_app.dependency_overrides.pop(_jwt_bearer)
+
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_xcom_cross_task_write_denied(self, client, session,
create_task_instance):
+ ti = create_task_instance()
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
Review Comment:
This test relies on `client.app.routes[-1]` to locate the mounted
`/execution` app before overriding `_jwt_bearer`. Route ordering can change;
consider using the `exec_app` fixture (which searches for the `/execution`
mount) to make the override robust.
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_connections.py:
##########
@@ -128,3 +107,37 @@ def test_connection_get_access_denied(self, client):
"message": "Task does not have access to connection test_conn",
}
}
+
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_connection_get_access_allowed(self, client, session,
create_task_instance):
+ connection = Connection(
+ conn_id="test_conn_auth",
+ conn_type="http",
+ description="description",
+ host="localhost",
+ )
+ session.add(connection)
+
+ ti = create_task_instance()
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
Review Comment:
Using `client.app.routes[-1]` to obtain the execution API sub-app is fragile
(route order isn’t guaranteed). There is already an `exec_app` fixture in
`tests/unit/api_fastapi/execution_api/conftest.py` that finds the `/execution`
mount. Consider using that fixture instead when applying `_jwt_bearer`
overrides.
##########
airflow-core/src/airflow/api_fastapi/execution_api/routes/xcoms.py:
##########
@@ -45,12 +45,45 @@ async def has_xcom_access(
xcom_key: Annotated[str, Path(alias="key", min_length=1)],
request: Request,
token=CurrentTIToken,
+ session: AsyncSessionDep = Depends(),
Review Comment:
`session: AsyncSessionDep = Depends()` is inconsistent with how
`SessionDep`/`AsyncSessionDep` are used elsewhere (e.g.
`execution_api/security.py:get_team_name_dep` takes `session: AsyncSessionDep`
without `Depends()`). Using `Depends()` here can lead FastAPI to ignore the
`Annotated[..., Depends(_get_async_session)]` metadata or apply dependencies
twice. Recommend removing the default and declaring the parameter as `session:
AsyncSessionDep`.
```suggestion
session: AsyncSessionDep,
```
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py:
##########
@@ -131,7 +103,64 @@ def test_xcom_access_denied(self, client, caplog):
"reason": "access_denied",
}
}
- assert any(msg.startswith("Checking read XCom access") for msg in
caplog.messages)
+
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_xcom_get_access_allowed(self, client, session,
create_task_instance):
+ ti = create_task_instance()
+ x = XComModel(
+ key="xcom_1",
+ value="value1",
+ dag_run_id=ti.dag_run.id,
+ run_id=ti.run_id,
+ task_id=ti.task_id,
+ dag_id=ti.dag_id,
+ )
+ session.add(x)
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
+
+ response =
client.get(f"/execution/xcoms/{ti.dag_id}/{ti.run_id}/{ti.task_id}/xcom_1")
+
+ assert response.status_code == 200
+ assert response.json() == {"key": "xcom_1", "value": "value1"}
+
+ exec_app.dependency_overrides.pop(_jwt_bearer)
+
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_xcom_cross_dag_read_denied(self, client, session,
create_task_instance):
+ ti = create_task_instance()
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
+
Review Comment:
Using `client.app.routes[-1]` to locate the `/execution` sub-app is not
stable across app/router changes. Consider using the existing `exec_app`
fixture from `tests/unit/api_fastapi/execution_api/conftest.py` to apply the
`_jwt_bearer` override for this test.
##########
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_xcoms.py:
##########
@@ -517,7 +546,54 @@ def test_xcom_access_denied(self, client, caplog):
"reason": "access_denied",
}
}
- assert any(msg.startswith("Checking write XCom access") for msg in
caplog.messages)
+
+ @mock.patch("airflow.settings.ENABLE_EXECUTION_API_AUTHZ", True)
+ def test_xcom_set_access_allowed(self, client, session,
create_task_instance):
+ ti = create_task_instance()
+ session.commit()
+
+ from fastapi import Request
+
+ from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+ from airflow.api_fastapi.execution_api.security import _jwt_bearer
+
+ async def mock_jwt_bearer(request: Request):
+ return TIToken(id=ti.id, claims={"sub": str(ti.id), "scope":
"execution"})
+
+ last_route = client.app.routes[-1]
+ exec_app = last_route.app
+ exec_app.dependency_overrides[_jwt_bearer] = mock_jwt_bearer
Review Comment:
`last_route = client.app.routes[-1]` is a fragile way to access the
execution API sub-app for dependency overrides. Prefer using the existing
`exec_app` fixture from `tests/unit/api_fastapi/execution_api/conftest.py` in
this test to avoid coupling to route ordering.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]