This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f52c471ed3a AIP-72: Add "Get Connection" endpoint for Execution API
(#43767)
f52c471ed3a is described below
commit f52c471ed3a396a8ff445ba5dcf1d59d4c3ac3cb
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Nov 7 17:11:37 2024 +0000
AIP-72: Add "Get Connection" endpoint for Execution API (#43767)
This commit introduces a new endpoint,
`/execution/connection/{connection_id}`, in the Execution API to retrieve
connection details. T
- Added `get_connection` endpoint to fetch connection details by
`connection_id`.
- Integrated placeholder function `get_task_token()` for token-based access
control.
- Implemented a placeholder `check_connection_access` function to validate
task permissions for each connection request.
- Access to connections is determined by the `allowed_connections`
attribute in the task token (currently mocked).
- Endpoint returns a 403 Forbidden status if the task does not have
permission for the requested connection.
- Placeholder `get_task_token()` function simulates task authentication;
future updates will include JWT-based authentication.
- Replace the placeholder `get_task_token()` with actual JWT-based
authentication to dynamically determine task permissions.
---
.../api_fastapi/execution_api/routes/__init__.py | 2 +
.../execution_api/routes/connections.py | 88 +++++++++++++++++
airflow/api_fastapi/execution_api/schemas.py | 20 ++++
.../execution_api/routes/test_connection.py | 107 +++++++++++++++++++++
4 files changed, 217 insertions(+)
diff --git a/airflow/api_fastapi/execution_api/routes/__init__.py
b/airflow/api_fastapi/execution_api/routes/__init__.py
index 55ee56b6168..c2ee885fab6 100644
--- a/airflow/api_fastapi/execution_api/routes/__init__.py
+++ b/airflow/api_fastapi/execution_api/routes/__init__.py
@@ -17,9 +17,11 @@
from __future__ import annotations
from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.execution_api.routes.connections import
connection_router
from airflow.api_fastapi.execution_api.routes.health import health_router
from airflow.api_fastapi.execution_api.routes.task_instance import ti_router
execution_api_router = AirflowRouter()
+execution_api_router.include_router(connection_router)
execution_api_router.include_router(health_router)
execution_api_router.include_router(ti_router)
diff --git a/airflow/api_fastapi/execution_api/routes/connections.py
b/airflow/api_fastapi/execution_api/routes/connections.py
new file mode 100644
index 00000000000..c5f5e97542b
--- /dev/null
+++ b/airflow/api_fastapi/execution_api/routes/connections.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+
+from fastapi import Depends, HTTPException, status
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.execution_api import schemas
+from airflow.exceptions import AirflowNotFoundException
+from airflow.models.connection import Connection
+
+# TODO: Add dependency on JWT token
+connection_router = AirflowRouter(
+ prefix="/connection",
+ tags=["Connection"],
+ responses={status.HTTP_404_NOT_FOUND: {"description": "Connection not
found"}},
+)
+
+log = logging.getLogger(__name__)
+
+
+def get_task_token() -> schemas.TIToken:
+ """TODO: Placeholder for task identity authentication. This should be
replaced with actual JWT decoding and validation."""
+ return schemas.TIToken(ti_key="test_key")
+
+
+@connection_router.get(
+ "/{connection_id}",
+ responses={
+ status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
+ status.HTTP_403_FORBIDDEN: {"description": "Task does not have access
to the connection"},
+ },
+)
+async def get_connection(
+ connection_id: str,
+ token: Annotated[schemas.TIToken, Depends(get_task_token)],
+) -> schemas.ConnectionResponse:
+ """Get an Airflow connection."""
+ if not has_connection_access(connection_id, token):
+ raise HTTPException(
+ status_code=status.HTTP_403_FORBIDDEN,
+ detail={
+ "reason": "access_denied",
+ "message": f"Task does not have access to connection
{connection_id}",
+ },
+ )
+ try:
+ connection = Connection.get_connection_from_secrets(connection_id)
+ except AirflowNotFoundException:
+ raise HTTPException(
+ status.HTTP_404_NOT_FOUND,
+ detail={
+ "reason": "not_found",
+ "message": f"Connection with ID {connection_id} not found",
+ },
+ )
+ return schemas.ConnectionResponse.model_validate(connection,
from_attributes=True)
+
+
+def has_connection_access(connection_id: str, token: schemas.TIToken) -> bool:
+ """Check if the task has access to the connection."""
+ # TODO: Placeholder for actual implementation
+
+ ti_key = token.ti_key
+ log.debug(
+ "Checking access for task instance with key '%s' to connection '%s'",
+ ti_key,
+ connection_id,
+ )
+ return True
diff --git a/airflow/api_fastapi/execution_api/schemas.py
b/airflow/api_fastapi/execution_api/schemas.py
index 90c07cf3b6a..c61718bbef2 100644
--- a/airflow/api_fastapi/execution_api/schemas.py
+++ b/airflow/api_fastapi/execution_api/schemas.py
@@ -119,3 +119,23 @@ class TIHeartbeatInfo(BaseModel):
hostname: str
pid: int
+
+
+class ConnectionResponse(BaseModel):
+ """Connection schema for responses with fields that are needed for
Runtime."""
+
+ conn_id: str
+ conn_type: str
+ host: str | None
+ schema_: str | None = Field(alias="schema")
+ login: str | None
+ password: str | None
+ port: int | None
+ extra: str | None
+
+
+# TODO: This is a placeholder for Task Identity Token schema.
+class TIToken(BaseModel):
+ """Task Identity Token."""
+
+ ti_key: str
diff --git a/tests/api_fastapi/execution_api/routes/test_connection.py
b/tests/api_fastapi/execution_api/routes/test_connection.py
new file mode 100644
index 00000000000..107fb8741ea
--- /dev/null
+++ b/tests/api_fastapi/execution_api/routes/test_connection.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.models.connection import Connection
+
+pytestmark = pytest.mark.db_test
+
+
+class TestGetConnection:
+ def test_connection_get_from_db(self, client, session):
+ connection = Connection(
+ conn_id="test_conn",
+ conn_type="http",
+ description="description",
+ host="localhost",
+ login="root",
+ password="admin",
+ schema="http",
+ port=8080,
+ extra='{"x_secret": "testsecret", "y_secret": "test"}',
+ )
+
+ session.add(connection)
+ session.commit()
+
+ response = client.get("/execution/connection/test_conn")
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "conn_id": "test_conn",
+ "conn_type": "http",
+ "host": "localhost",
+ "login": "root",
+ "password": "admin",
+ "schema": "http",
+ "port": 8080,
+ "extra": '{"x_secret": "testsecret", "y_secret": "test"}',
+ }
+
+ # Remove connection
+ session.delete(connection)
+ session.commit()
+
+ @mock.patch.dict(
+ "os.environ",
+ {"AIRFLOW_CONN_TEST_CONN2": '{"uri":
"http://root:admin@localhost:8080/https?headers=header"}'},
+ )
+ def test_connection_get_from_env_var(self, client, session):
+ response = client.get("/execution/connection/test_conn2")
+
+ assert response.status_code == 200
+ assert response.json() == {
+ "conn_id": "test_conn2",
+ "conn_type": "http",
+ "host": "localhost",
+ "login": "root",
+ "password": "admin",
+ "schema": "https",
+ "port": 8080,
+ "extra": '{"headers": "header"}',
+ }
+
+ def test_connection_get_not_found(self, client):
+ response = client.get("/execution/connection/non_existent_test_conn")
+
+ assert response.status_code == 404
+ assert response.json() == {
+ "detail": {
+ "message": "Connection with ID non_existent_test_conn not
found",
+ "reason": "not_found",
+ }
+ }
+
+ def test_connection_get_access_denied(self, client):
+ with mock.patch(
+
"airflow.api_fastapi.execution_api.routes.connections.has_connection_access",
return_value=False
+ ):
+ response = client.get("/execution/connection/test_conn")
+
+ # Assert response status code and detail for access denied
+ assert response.status_code == 403
+ assert response.json() == {
+ "detail": {
+ "reason": "access_denied",
+ "message": "Task does not have access to connection test_conn",
+ }
+ }