This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 01302a18229 AIP-72: Add "Get Variable" endpoint for Execution API 
(#43832)
01302a18229 is described below

commit 01302a1822910f77b90f23f7504fadd4c0d3f295
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Nov 8 23:55:46 2024 +0000

    AIP-72: Add "Get Variable" endpoint for Execution API (#43832)
    
    This commit introduces a new endpoint, 
`/execution/variable/{variable_key}`, in the Execution API to retrieve 
Variables details.
    
    Same as the Connections PR, it uses a placeholder `check_connection_access` 
function to validate task permissions for each request.
---
 airflow/api_fastapi/execution_api/datamodels.py    |  9 +++
 .../api_fastapi/execution_api/routes/__init__.py   | 11 ++--
 .../execution_api/routes/connections.py            |  6 +-
 airflow/api_fastapi/execution_api/routes/health.py |  4 +-
 .../execution_api/routes/task_instance.py          |  9 +--
 .../routes/{connections.py => variables.py}        | 45 +++++++------
 .../{test_connection.py => test_connections.py}    |  8 +--
 .../execution_api/routes/test_variables.py         | 77 ++++++++++++++++++++++
 8 files changed, 124 insertions(+), 45 deletions(-)

diff --git a/airflow/api_fastapi/execution_api/datamodels.py 
b/airflow/api_fastapi/execution_api/datamodels.py
index c61718bbef2..78dbca76bce 100644
--- a/airflow/api_fastapi/execution_api/datamodels.py
+++ b/airflow/api_fastapi/execution_api/datamodels.py
@@ -134,6 +134,15 @@ class ConnectionResponse(BaseModel):
     extra: str | None
 
 
+class VariableResponse(BaseModel):
+    """Variable schema for responses with fields that are needed for 
Runtime."""
+
+    model_config = ConfigDict(from_attributes=True)
+
+    key: str
+    val: str | None = Field(alias="value")
+
+
 # TODO: This is a placeholder for Task Identity Token schema.
 class TIToken(BaseModel):
     """Task Identity Token."""
diff --git a/airflow/api_fastapi/execution_api/routes/__init__.py 
b/airflow/api_fastapi/execution_api/routes/__init__.py
index c2ee885fab6..76479c96cf7 100644
--- a/airflow/api_fastapi/execution_api/routes/__init__.py
+++ b/airflow/api_fastapi/execution_api/routes/__init__.py
@@ -17,11 +17,10 @@
 from __future__ import annotations
 
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.execution_api.routes.connections import 
connection_router
-from airflow.api_fastapi.execution_api.routes.health import health_router
-from airflow.api_fastapi.execution_api.routes.task_instance import ti_router
+from airflow.api_fastapi.execution_api.routes import connections, health, 
task_instance, variables
 
 execution_api_router = AirflowRouter()
-execution_api_router.include_router(connection_router)
-execution_api_router.include_router(health_router)
-execution_api_router.include_router(ti_router)
+execution_api_router.include_router(connections.router, prefix="/connections", 
tags=["Connections"])
+execution_api_router.include_router(health.router, tags=["Health"])
+execution_api_router.include_router(task_instance.router, 
prefix="/task_instance", tags=["Task Instance"])
+execution_api_router.include_router(variables.router, prefix="/variables", 
tags=["Variables"])
diff --git a/airflow/api_fastapi/execution_api/routes/connections.py 
b/airflow/api_fastapi/execution_api/routes/connections.py
index 4e0c6eb007c..553cb0785d6 100644
--- a/airflow/api_fastapi/execution_api/routes/connections.py
+++ b/airflow/api_fastapi/execution_api/routes/connections.py
@@ -28,9 +28,7 @@ from airflow.exceptions import AirflowNotFoundException
 from airflow.models.connection import Connection
 
 # TODO: Add dependency on JWT token
-connection_router = AirflowRouter(
-    prefix="/connection",
-    tags=["Connection"],
+router = AirflowRouter(
     responses={status.HTTP_404_NOT_FOUND: {"description": "Connection not 
found"}},
 )
 
@@ -42,7 +40,7 @@ def get_task_token() -> datamodels.TIToken:
     return datamodels.TIToken(ti_key="test_key")
 
 
-@connection_router.get(
[email protected](
     "/{connection_id}",
     responses={
         status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
diff --git a/airflow/api_fastapi/execution_api/routes/health.py 
b/airflow/api_fastapi/execution_api/routes/health.py
index c8d903815dc..7bf4c4a2de0 100644
--- a/airflow/api_fastapi/execution_api/routes/health.py
+++ b/airflow/api_fastapi/execution_api/routes/health.py
@@ -19,9 +19,9 @@ from __future__ import annotations
 
 from airflow.api_fastapi.common.router import AirflowRouter
 
-health_router = AirflowRouter(tags=["Health"])
+router = AirflowRouter()
 
 
-@health_router.get("/health")
[email protected]("/health")
 def health() -> dict:
     return {"status": "healthy"}
diff --git a/airflow/api_fastapi/execution_api/routes/task_instance.py 
b/airflow/api_fastapi/execution_api/routes/task_instance.py
index 4612b0c0425..3ef37013f89 100644
--- a/airflow/api_fastapi/execution_api/routes/task_instance.py
+++ b/airflow/api_fastapi/execution_api/routes/task_instance.py
@@ -35,16 +35,13 @@ from airflow.utils import timezone
 from airflow.utils.state import State
 
 # TODO: Add dependency on JWT token
-ti_router = AirflowRouter(
-    prefix="/task_instance",
-    tags=["Task Instance"],
-)
+router = AirflowRouter()
 
 
 log = logging.getLogger(__name__)
 
 
-@ti_router.patch(
[email protected](
     "/{task_instance_id}/state",
     status_code=status.HTTP_204_NO_CONTENT,
     # TODO: Add description to the operation
@@ -133,7 +130,7 @@ def ti_update_state(
         )
 
 
-@ti_router.put(
[email protected](
     "/{task_instance_id}/heartbeat",
     status_code=status.HTTP_204_NO_CONTENT,
     responses={
diff --git a/airflow/api_fastapi/execution_api/routes/connections.py 
b/airflow/api_fastapi/execution_api/routes/variables.py
similarity index 64%
copy from airflow/api_fastapi/execution_api/routes/connections.py
copy to airflow/api_fastapi/execution_api/routes/variables.py
index 4e0c6eb007c..79df5678aca 100644
--- a/airflow/api_fastapi/execution_api/routes/connections.py
+++ b/airflow/api_fastapi/execution_api/routes/variables.py
@@ -24,14 +24,11 @@ from typing_extensions import Annotated
 
 from airflow.api_fastapi.common.router import AirflowRouter
 from airflow.api_fastapi.execution_api import datamodels
-from airflow.exceptions import AirflowNotFoundException
-from airflow.models.connection import Connection
+from airflow.models.variable import Variable
 
 # TODO: Add dependency on JWT token
-connection_router = AirflowRouter(
-    prefix="/connection",
-    tags=["Connection"],
-    responses={status.HTTP_404_NOT_FOUND: {"description": "Connection not 
found"}},
+router = AirflowRouter(
+    responses={status.HTTP_404_NOT_FOUND: {"description": "Variable not 
found"}},
 )
 
 log = logging.getLogger(__name__)
@@ -42,47 +39,49 @@ def get_task_token() -> datamodels.TIToken:
     return datamodels.TIToken(ti_key="test_key")
 
 
-@connection_router.get(
-    "/{connection_id}",
[email protected](
+    "/{variable_key}",
     responses={
         status.HTTP_401_UNAUTHORIZED: {"description": "Unauthorized"},
-        status.HTTP_403_FORBIDDEN: {"description": "Task does not have access 
to the connection"},
+        status.HTTP_403_FORBIDDEN: {"description": "Task does not have access 
to the variable"},
     },
 )
-def get_connection(
-    connection_id: str,
+def get_variable(
+    variable_key: str,
     token: Annotated[datamodels.TIToken, Depends(get_task_token)],
-) -> datamodels.ConnectionResponse:
-    """Get an Airflow connection."""
-    if not has_connection_access(connection_id, token):
+) -> datamodels.VariableResponse:
+    """Get an Airflow Variable."""
+    if not has_variable_access(variable_key, token):
         raise HTTPException(
             status_code=status.HTTP_403_FORBIDDEN,
             detail={
                 "reason": "access_denied",
-                "message": f"Task does not have access to connection 
{connection_id}",
+                "message": f"Task does not have access to variable 
{variable_key}",
             },
         )
+
     try:
-        connection = Connection.get_connection_from_secrets(connection_id)
-    except AirflowNotFoundException:
+        variable_value = Variable.get(variable_key)
+    except KeyError:
         raise HTTPException(
             status.HTTP_404_NOT_FOUND,
             detail={
                 "reason": "not_found",
-                "message": f"Connection with ID {connection_id} not found",
+                "message": f"Variable with key '{variable_key}' not found",
             },
         )
-    return datamodels.ConnectionResponse.model_validate(connection, 
from_attributes=True)
+
+    return datamodels.VariableResponse(key=variable_key, value=variable_value)
 
 
-def has_connection_access(connection_id: str, token: datamodels.TIToken) -> 
bool:
-    """Check if the task has access to the connection."""
+def has_variable_access(variable_key: str, token: datamodels.TIToken) -> bool:
+    """Check if the task has access to the variable."""
     # TODO: Placeholder for actual implementation
 
     ti_key = token.ti_key
     log.debug(
-        "Checking access for task instance with key '%s' to connection '%s'",
+        "Checking access for task instance with key '%s' to variable '%s'",
         ti_key,
-        connection_id,
+        variable_key,
     )
     return True
diff --git a/tests/api_fastapi/execution_api/routes/test_connection.py 
b/tests/api_fastapi/execution_api/routes/test_connections.py
similarity index 92%
rename from tests/api_fastapi/execution_api/routes/test_connection.py
rename to tests/api_fastapi/execution_api/routes/test_connections.py
index 107fb8741ea..287a5bac801 100644
--- a/tests/api_fastapi/execution_api/routes/test_connection.py
+++ b/tests/api_fastapi/execution_api/routes/test_connections.py
@@ -43,7 +43,7 @@ class TestGetConnection:
         session.add(connection)
         session.commit()
 
-        response = client.get("/execution/connection/test_conn")
+        response = client.get("/execution/connections/test_conn")
 
         assert response.status_code == 200
         assert response.json() == {
@@ -66,7 +66,7 @@ class TestGetConnection:
         {"AIRFLOW_CONN_TEST_CONN2": '{"uri": 
"http://root:admin@localhost:8080/https?headers=header"}'},
     )
     def test_connection_get_from_env_var(self, client, session):
-        response = client.get("/execution/connection/test_conn2")
+        response = client.get("/execution/connections/test_conn2")
 
         assert response.status_code == 200
         assert response.json() == {
@@ -81,7 +81,7 @@ class TestGetConnection:
         }
 
     def test_connection_get_not_found(self, client):
-        response = client.get("/execution/connection/non_existent_test_conn")
+        response = client.get("/execution/connections/non_existent_test_conn")
 
         assert response.status_code == 404
         assert response.json() == {
@@ -95,7 +95,7 @@ class TestGetConnection:
         with mock.patch(
             
"airflow.api_fastapi.execution_api.routes.connections.has_connection_access", 
return_value=False
         ):
-            response = client.get("/execution/connection/test_conn")
+            response = client.get("/execution/connections/test_conn")
 
         # Assert response status code and detail for access denied
         assert response.status_code == 403
diff --git a/tests/api_fastapi/execution_api/routes/test_variables.py 
b/tests/api_fastapi/execution_api/routes/test_variables.py
new file mode 100644
index 00000000000..67247e4adb9
--- /dev/null
+++ b/tests/api_fastapi/execution_api/routes/test_variables.py
@@ -0,0 +1,77 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from unittest import mock
+
+import pytest
+
+from airflow.models.variable import Variable
+
+pytestmark = pytest.mark.db_test
+
+
+class TestGetVariable:
+    def test_variable_get_from_db(self, client, session):
+        Variable.set(key="var1", value="value", session=session)
+        session.commit()
+
+        response = client.get("/execution/variables/var1")
+
+        assert response.status_code == 200
+        assert response.json() == {"key": "var1", "value": "value"}
+
+        # Remove connection
+        Variable.delete(key="var1", session=session)
+        session.commit()
+
+    @mock.patch.dict(
+        "os.environ",
+        {"AIRFLOW_VAR_KEY1": "VALUE"},
+    )
+    def test_variable_get_from_env_var(self, client, session):
+        response = client.get("/execution/variables/key1")
+
+        assert response.status_code == 200
+        assert response.json() == {"key": "key1", "value": "VALUE"}
+
+    def test_variable_get_not_found(self, client):
+        response = client.get("/execution/variables/non_existent_var")
+
+        assert response.status_code == 404
+        assert response.json() == {
+            "detail": {
+                "message": "Variable with key 'non_existent_var' not found",
+                "reason": "not_found",
+            }
+        }
+
+    def test_variable_get_access_denied(self, client):
+        with mock.patch(
+            
"airflow.api_fastapi.execution_api.routes.variables.has_variable_access", 
return_value=False
+        ):
+            response = client.get("/execution/variables/key1")
+
+        # Assert response status code and detail for access denied
+        assert response.status_code == 403
+        assert response.json() == {
+            "detail": {
+                "reason": "access_denied",
+                "message": "Task does not have access to variable key1",
+            }
+        }

Reply via email to