This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new f52c471ed3a AIP-72: Add "Get Connection" endpoint for Execution API 
(#43767)
f52c471ed3a is described below

commit f52c471ed3a396a8ff445ba5dcf1d59d4c3ac3cb
Author: Kaxil Naik <[email protected]>
AuthorDate: Thu Nov 7 17:11:37 2024 +0000

    AIP-72: Add "Get Connection" endpoint for Execution API (#43767)
    
    This commit introduces a new endpoint, 
`/execution/connection/{connection_id}`, in the Execution API to retrieve 
connection details. T
    - Added `get_connection` endpoint to fetch connection details by 
`connection_id`.
    - Integrated placeholder function `get_task_token()` for token-based access 
control.
    - Implemented a placeholder `check_connection_access` function to validate 
task permissions for each connection request.
    
    - Access to connections is determined by the `allowed_connections` 
attribute in the task token (currently mocked).
    - Endpoint returns a 403 Forbidden status if the task does not have 
permission for the requested connection.
    - Placeholder `get_task_token()` function simulates task authentication; 
future updates will include JWT-based authentication.
    
    - Replace the placeholder `get_task_token()` with actual JWT-based 
authentication to dynamically determine task permissions.
---
 .../api_fastapi/execution_api/routes/__init__.py   |   2 +
 .../execution_api/routes/connections.py            |  88 +++++++++++++++++
 airflow/api_fastapi/execution_api/schemas.py       |  20 ++++
 .../execution_api/routes/test_connection.py        | 107 +++++++++++++++++++++
 4 files changed, 217 insertions(+)

diff --git a/airflow/api_fastapi/execution_api/routes/__init__.py 
b/airflow/api_fastapi/execution_api/routes/__init__.py
index 55ee56b6168..c2ee885fab6 100644
--- a/airflow/api_fastapi/execution_api/routes/__init__.py
+++ b/airflow/api_fastapi/execution_api/routes/__init__.py
@@ -17,9 +17,11 @@
 from __future__ import annotations
 
 from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.execution_api.routes.connections import 
connection_router
 from airflow.api_fastapi.execution_api.routes.health import health_router
 from airflow.api_fastapi.execution_api.routes.task_instance import ti_router
 
 execution_api_router = AirflowRouter()
+execution_api_router.include_router(connection_router)
 execution_api_router.include_router(health_router)
 execution_api_router.include_router(ti_router)
diff --git a/airflow/api_fastapi/execution_api/routes/connections.py 
b/airflow/api_fastapi/execution_api/routes/connections.py
new file mode 100644
index 00000000000..c5f5e97542b
--- /dev/null
+++ b/airflow/api_fastapi/execution_api/routes/connections.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+import logging
+
+from fastapi import Depends, HTTPException, status
+from typing_extensions import Annotated
+
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.execution_api import schemas
+from airflow.exceptions import AirflowNotFoundException
+from airflow.models.connection import Connection
+
+# TODO: Add dependency on JWT token
+connection_router = AirflowRouter(
+    prefix="/connection",
+    tags=["Connection"],
+    responses={status.HTTP_404_NOT_FOUND: {"description": "Connection not 
found"}},
+)
+
+log = logging.getLogger(__name__)
+
+
+def get_task_token() -> schemas.TIToken:
+    """TODO: Placeholder for task identity authentication. This should be 
replaced with actual JWT decoding and validation."""
+    return schemas.TIToken(ti_key="test_key")
+
+
+@connection_router.get(
+    "/{connection_id}",
+    responses={
+        status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
+        status.HTTP_403_FORBIDDEN: {"description": "Task does not have access 
to the connection"},
+    },
+)
+async def get_connection(
+    connection_id: str,
+    token: Annotated[schemas.TIToken, Depends(get_task_token)],
+) -> schemas.ConnectionResponse:
+    """Get an Airflow connection."""
+    if not has_connection_access(connection_id, token):
+        raise HTTPException(
+            status_code=status.HTTP_403_FORBIDDEN,
+            detail={
+                "reason": "access_denied",
+                "message": f"Task does not have access to connection 
{connection_id}",
+            },
+        )
+    try:
+        connection = Connection.get_connection_from_secrets(connection_id)
+    except AirflowNotFoundException:
+        raise HTTPException(
+            status.HTTP_404_NOT_FOUND,
+            detail={
+                "reason": "not_found",
+                "message": f"Connection with ID {connection_id} not found",
+            },
+        )
+    return schemas.ConnectionResponse.model_validate(connection, 
from_attributes=True)
+
+
+def has_connection_access(connection_id: str, token: schemas.TIToken) -> bool:
+    """Check if the task has access to the connection."""
+    # TODO: Placeholder for actual implementation
+
+    ti_key = token.ti_key
+    log.debug(
+        "Checking access for task instance with key '%s' to connection '%s'",
+        ti_key,
+        connection_id,
+    )
+    return True
diff --git a/airflow/api_fastapi/execution_api/schemas.py 
b/airflow/api_fastapi/execution_api/schemas.py
index 90c07cf3b6a..c61718bbef2 100644
--- a/airflow/api_fastapi/execution_api/schemas.py
+++ b/airflow/api_fastapi/execution_api/schemas.py
@@ -119,3 +119,23 @@ class TIHeartbeatInfo(BaseModel):
 
     hostname: str
     pid: int
+
+
+class ConnectionResponse(BaseModel):
+    """Connection schema for responses with fields that are needed for 
Runtime."""
+
+    conn_id: str
+    conn_type: str
+    host: str | None
+    schema_: str | None = Field(alias="schema")
+    login: str | None
+    password: str | None
+    port: int | None
+    extra: str | None
+
+
+# TODO: This is a placeholder for Task Identity Token schema.
+class TIToken(BaseModel):
+    """Task Identity Token."""
+
+    ti_key: str
diff --git a/tests/api_fastapi/execution_api/routes/test_connection.py 
b/tests/api_fastapi/execution_api/routes/test_connection.py
new file mode 100644
index 00000000000..107fb8741ea
--- /dev/null
+++ b/tests/api_fastapi/execution_api/routes/test_connection.py
@@ -0,0 +1,107 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.models.connection import Connection
+
+pytestmark = pytest.mark.db_test
+
+
+class TestGetConnection:
+    def test_connection_get_from_db(self, client, session):
+        connection = Connection(
+            conn_id="test_conn",
+            conn_type="http",
+            description="description",
+            host="localhost",
+            login="root",
+            password="admin",
+            schema="http",
+            port=8080,
+            extra='{"x_secret": "testsecret", "y_secret": "test"}',
+        )
+
+        session.add(connection)
+        session.commit()
+
+        response = client.get("/execution/connection/test_conn")
+
+        assert response.status_code == 200
+        assert response.json() == {
+            "conn_id": "test_conn",
+            "conn_type": "http",
+            "host": "localhost",
+            "login": "root",
+            "password": "admin",
+            "schema": "http",
+            "port": 8080,
+            "extra": '{"x_secret": "testsecret", "y_secret": "test"}',
+        }
+
+        # Remove connection
+        session.delete(connection)
+        session.commit()
+
+    @mock.patch.dict(
+        "os.environ",
+        {"AIRFLOW_CONN_TEST_CONN2": '{"uri": 
"http://root:admin@localhost:8080/https?headers=header"}'},
+    )
+    def test_connection_get_from_env_var(self, client, session):
+        response = client.get("/execution/connection/test_conn2")
+
+        assert response.status_code == 200
+        assert response.json() == {
+            "conn_id": "test_conn2",
+            "conn_type": "http",
+            "host": "localhost",
+            "login": "root",
+            "password": "admin",
+            "schema": "https",
+            "port": 8080,
+            "extra": '{"headers": "header"}',
+        }
+
+    def test_connection_get_not_found(self, client):
+        response = client.get("/execution/connection/non_existent_test_conn")
+
+        assert response.status_code == 404
+        assert response.json() == {
+            "detail": {
+                "message": "Connection with ID non_existent_test_conn not 
found",
+                "reason": "not_found",
+            }
+        }
+
+    def test_connection_get_access_denied(self, client):
+        with mock.patch(
+            
"airflow.api_fastapi.execution_api.routes.connections.has_connection_access", 
return_value=False
+        ):
+            response = client.get("/execution/connection/test_conn")
+
+        # Assert response status code and detail for access denied
+        assert response.status_code == 403
+        assert response.json() == {
+            "detail": {
+                "reason": "access_denied",
+                "message": "Task does not have access to connection test_conn",
+            }
+        }

Reply via email to