This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 5c442d378dc Modularize datamodels in Execution API (#44068)
5c442d378dc is described below

commit 5c442d378dc3d06c2a18cf7b7f6f2777e22da6f5
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri Nov 15 19:08:39 2024 +0000

    Modularize datamodels in Execution API (#44068)
    
    Split the `datamodels` module to granular mods
---
 .../{deps.py => datamodels/__init__.py}            | 16 --------
 .../{deps.py => datamodels/connection.py}          | 21 +++++-----
 .../{datamodels.py => datamodels/taskinstance.py}  | 48 +---------------------
 .../execution_api/{deps.py => datamodels/token.py} | 15 +++----
 .../{deps.py => datamodels/variable.py}            | 15 +++----
 .../execution_api/{deps.py => datamodels/xcom.py}  | 16 ++++----
 airflow/api_fastapi/execution_api/deps.py          |  8 ++--
 .../execution_api/routes/connections.py            | 10 +++--
 .../execution_api/routes/task_instances.py         | 15 ++++---
 .../api_fastapi/execution_api/routes/variables.py  | 13 +++---
 airflow/api_fastapi/execution_api/routes/xcoms.py  | 10 +++--
 11 files changed, 63 insertions(+), 124 deletions(-)

diff --git a/airflow/api_fastapi/execution_api/deps.py 
b/airflow/api_fastapi/execution_api/datamodels/__init__.py
similarity index 63%
copy from airflow/api_fastapi/execution_api/deps.py
copy to airflow/api_fastapi/execution_api/datamodels/__init__.py
index 4564324668a..13a83393a91 100644
--- a/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow/api_fastapi/execution_api/datamodels/__init__.py
@@ -14,19 +14,3 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
-from __future__ import annotations
-
-from typing import Annotated
-
-from fastapi import Depends
-
-from airflow.api_fastapi.execution_api import datamodels
-
-
-def get_task_token() -> datamodels.TIToken:
-    """TODO: Placeholder for task identity authentication. This should be 
replaced with actual JWT decoding and validation."""
-    return datamodels.TIToken(ti_key="test_key")
-
-
-TokenDep = Annotated[datamodels.TIToken, Depends(get_task_token)]
diff --git a/airflow/api_fastapi/execution_api/deps.py 
b/airflow/api_fastapi/execution_api/datamodels/connection.py
similarity index 67%
copy from airflow/api_fastapi/execution_api/deps.py
copy to airflow/api_fastapi/execution_api/datamodels/connection.py
index 4564324668a..f3c67895298 100644
--- a/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow/api_fastapi/execution_api/datamodels/connection.py
@@ -17,16 +17,17 @@
 
 from __future__ import annotations
 
-from typing import Annotated
+from pydantic import BaseModel, Field
 
-from fastapi import Depends
 
-from airflow.api_fastapi.execution_api import datamodels
+class ConnectionResponse(BaseModel):
+    """Connection schema for responses with fields that are needed for 
Runtime."""
 
-
-def get_task_token() -> datamodels.TIToken:
-    """TODO: Placeholder for task identity authentication. This should be 
replaced with actual JWT decoding and validation."""
-    return datamodels.TIToken(ti_key="test_key")
-
-
-TokenDep = Annotated[datamodels.TIToken, Depends(get_task_token)]
+    conn_id: str
+    conn_type: str
+    host: str | None
+    schema_: str | None = Field(alias="schema")
+    login: str | None
+    password: str | None
+    port: int | None
+    extra: str | None
diff --git a/airflow/api_fastapi/execution_api/datamodels.py 
b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
similarity index 76%
rename from airflow/api_fastapi/execution_api/datamodels.py
rename to airflow/api_fastapi/execution_api/datamodels/taskinstance.py
index ec8be531e10..db63dc3a8db 100644
--- a/airflow/api_fastapi/execution_api/datamodels.py
+++ b/airflow/api_fastapi/execution_api/datamodels/taskinstance.py
@@ -17,16 +17,9 @@
 
 from __future__ import annotations
 
-from typing import Annotated, Any, Literal, Union
+from typing import Annotated, Literal, Union
 
-from pydantic import (
-    BaseModel,
-    ConfigDict,
-    Discriminator,
-    Field,
-    Tag,
-    WithJsonSchema,
-)
+from pydantic import BaseModel, ConfigDict, Discriminator, Tag, WithJsonSchema
 
 from airflow.api_fastapi.common.types import UtcDateTime
 from airflow.utils.state import IntermediateTIState, TaskInstanceState as 
TIState, TerminalTIState
@@ -104,40 +97,3 @@ class TIHeartbeatInfo(BaseModel):
 
     hostname: str
     pid: int
-
-
-class ConnectionResponse(BaseModel):
-    """Connection schema for responses with fields that are needed for 
Runtime."""
-
-    conn_id: str
-    conn_type: str
-    host: str | None
-    schema_: str | None = Field(alias="schema")
-    login: str | None
-    password: str | None
-    port: int | None
-    extra: str | None
-
-
-class VariableResponse(BaseModel):
-    """Variable schema for responses with fields that are needed for 
Runtime."""
-
-    model_config = ConfigDict(from_attributes=True)
-
-    key: str
-    val: str | None = Field(alias="value")
-
-
-class XComResponse(BaseModel):
-    """XCom schema for responses with fields that are needed for Runtime."""
-
-    key: str
-    value: Any
-    """The returned XCom value in a JSON-compatible format."""
-
-
-# TODO: This is a placeholder for Task Identity Token schema.
-class TIToken(BaseModel):
-    """Task Identity Token."""
-
-    ti_key: str
diff --git a/airflow/api_fastapi/execution_api/deps.py 
b/airflow/api_fastapi/execution_api/datamodels/token.py
similarity index 67%
copy from airflow/api_fastapi/execution_api/deps.py
copy to airflow/api_fastapi/execution_api/datamodels/token.py
index 4564324668a..7086c39813e 100644
--- a/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow/api_fastapi/execution_api/datamodels/token.py
@@ -17,16 +17,11 @@
 
 from __future__ import annotations
 
-from typing import Annotated
+from pydantic import BaseModel
 
-from fastapi import Depends
 
-from airflow.api_fastapi.execution_api import datamodels
+# TODO: This is a placeholder for Task Identity Token schema.
+class TIToken(BaseModel):
+    """Task Identity Token."""
 
-
-def get_task_token() -> datamodels.TIToken:
-    """TODO: Placeholder for task identity authentication. This should be 
replaced with actual JWT decoding and validation."""
-    return datamodels.TIToken(ti_key="test_key")
-
-
-TokenDep = Annotated[datamodels.TIToken, Depends(get_task_token)]
+    ti_key: str
diff --git a/airflow/api_fastapi/execution_api/deps.py 
b/airflow/api_fastapi/execution_api/datamodels/variable.py
similarity index 67%
copy from airflow/api_fastapi/execution_api/deps.py
copy to airflow/api_fastapi/execution_api/datamodels/variable.py
index 4564324668a..6819286f54b 100644
--- a/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow/api_fastapi/execution_api/datamodels/variable.py
@@ -17,16 +17,13 @@
 
 from __future__ import annotations
 
-from typing import Annotated
+from pydantic import BaseModel, ConfigDict, Field
 
-from fastapi import Depends
 
-from airflow.api_fastapi.execution_api import datamodels
+class VariableResponse(BaseModel):
+    """Variable schema for responses with fields that are needed for 
Runtime."""
 
+    model_config = ConfigDict(from_attributes=True)
 
-def get_task_token() -> datamodels.TIToken:
-    """TODO: Placeholder for task identity authentication. This should be 
replaced with actual JWT decoding and validation."""
-    return datamodels.TIToken(ti_key="test_key")
-
-
-TokenDep = Annotated[datamodels.TIToken, Depends(get_task_token)]
+    key: str
+    val: str | None = Field(alias="value")
diff --git a/airflow/api_fastapi/execution_api/deps.py 
b/airflow/api_fastapi/execution_api/datamodels/xcom.py
similarity index 67%
copy from airflow/api_fastapi/execution_api/deps.py
copy to airflow/api_fastapi/execution_api/datamodels/xcom.py
index 4564324668a..6fb6c14629e 100644
--- a/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow/api_fastapi/execution_api/datamodels/xcom.py
@@ -17,16 +17,14 @@
 
 from __future__ import annotations
 
-from typing import Annotated
+from typing import Any
 
-from fastapi import Depends
+from pydantic import BaseModel
 
-from airflow.api_fastapi.execution_api import datamodels
 
+class XComResponse(BaseModel):
+    """XCom schema for responses with fields that are needed for Runtime."""
 
-def get_task_token() -> datamodels.TIToken:
-    """TODO: Placeholder for task identity authentication. This should be 
replaced with actual JWT decoding and validation."""
-    return datamodels.TIToken(ti_key="test_key")
-
-
-TokenDep = Annotated[datamodels.TIToken, Depends(get_task_token)]
+    key: str
+    value: Any
+    """The returned XCom value in a JSON-compatible format."""
diff --git a/airflow/api_fastapi/execution_api/deps.py 
b/airflow/api_fastapi/execution_api/deps.py
index 4564324668a..9e409bd3d6c 100644
--- a/airflow/api_fastapi/execution_api/deps.py
+++ b/airflow/api_fastapi/execution_api/deps.py
@@ -21,12 +21,12 @@ from typing import Annotated
 
 from fastapi import Depends
 
-from airflow.api_fastapi.execution_api import datamodels
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
 
 
-def get_task_token() -> datamodels.TIToken:
+def get_task_token() -> TIToken:
     """TODO: Placeholder for task identity authentication. This should be 
replaced with actual JWT decoding and validation."""
-    return datamodels.TIToken(ti_key="test_key")
+    return TIToken(ti_key="test_key")
 
 
-TokenDep = Annotated[datamodels.TIToken, Depends(get_task_token)]
+TokenDep = Annotated[TIToken, Depends(get_task_token)]
diff --git a/airflow/api_fastapi/execution_api/routes/connections.py 
b/airflow/api_fastapi/execution_api/routes/connections.py
index d31cfbaeb9d..86f94f5ef3f 100644
--- a/airflow/api_fastapi/execution_api/routes/connections.py
+++ b/airflow/api_fastapi/execution_api/routes/connections.py
@@ -22,7 +22,9 @@ import logging
 from fastapi import HTTPException, status
 
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.execution_api import datamodels, deps
+from airflow.api_fastapi.execution_api import deps
+from airflow.api_fastapi.execution_api.datamodels.connection import 
ConnectionResponse
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
 from airflow.exceptions import AirflowNotFoundException
 from airflow.models.connection import Connection
 
@@ -44,7 +46,7 @@ log = logging.getLogger(__name__)
 def get_connection(
     connection_id: str,
     token: deps.TokenDep,
-) -> datamodels.ConnectionResponse:
+) -> ConnectionResponse:
     """Get an Airflow connection."""
     if not has_connection_access(connection_id, token):
         raise HTTPException(
@@ -64,10 +66,10 @@ def get_connection(
                 "message": f"Connection with ID {connection_id} not found",
             },
         )
-    return datamodels.ConnectionResponse.model_validate(connection, 
from_attributes=True)
+    return ConnectionResponse.model_validate(connection, from_attributes=True)
 
 
-def has_connection_access(connection_id: str, token: datamodels.TIToken) -> 
bool:
+def has_connection_access(connection_id: str, token: TIToken) -> bool:
     """Check if the task has access to the connection."""
     # TODO: Placeholder for actual implementation
 
diff --git a/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow/api_fastapi/execution_api/routes/task_instances.py
index 97723ffc4a8..3adbd51ff2a 100644
--- a/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ b/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -29,7 +29,12 @@ from sqlalchemy.sql import select
 
 from airflow.api_fastapi.common.db.common import get_session
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.execution_api import datamodels
+from airflow.api_fastapi.execution_api.datamodels.taskinstance import (
+    TIEnterRunningPayload,
+    TIHeartbeatInfo,
+    TIStateUpdate,
+    TITerminalStatePayload,
+)
 from airflow.models.taskinstance import TaskInstance as TI
 from airflow.utils import timezone
 from airflow.utils.state import State
@@ -55,7 +60,7 @@ log = logging.getLogger(__name__)
 )
 def ti_update_state(
     task_instance_id: UUID,
-    ti_patch_payload: Annotated[datamodels.TIStateUpdate, Body()],
+    ti_patch_payload: Annotated[TIStateUpdate, Body()],
     session: Annotated[Session, Depends(get_session)],
 ):
     """
@@ -85,7 +90,7 @@ def ti_update_state(
 
     query = update(TI).where(TI.id == ti_id_str).values(data)
 
-    if isinstance(ti_patch_payload, datamodels.TIEnterRunningPayload):
+    if isinstance(ti_patch_payload, TIEnterRunningPayload):
         if previous_state != State.QUEUED:
             log.warning(
                 "Can not start Task Instance ('%s') in invalid state: %s",
@@ -115,7 +120,7 @@ def ti_update_state(
             pid=ti_patch_payload.pid,
             state=State.RUNNING,
         )
-    elif isinstance(ti_patch_payload, datamodels.TITerminalStatePayload):
+    elif isinstance(ti_patch_payload, TITerminalStatePayload):
         query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
 
     # TODO: Replace this with FastAPI's Custom Exception handling:
@@ -143,7 +148,7 @@ def ti_update_state(
 )
 def ti_heartbeat(
     task_instance_id: UUID,
-    ti_payload: datamodels.TIHeartbeatInfo,
+    ti_payload: TIHeartbeatInfo,
     session: Annotated[Session, Depends(get_session)],
 ):
     """Update the heartbeat of a TaskInstance to mark it as alive & still 
running."""
diff --git a/airflow/api_fastapi/execution_api/routes/variables.py 
b/airflow/api_fastapi/execution_api/routes/variables.py
index 1ecc7480ee3..e8e2012e8d1 100644
--- a/airflow/api_fastapi/execution_api/routes/variables.py
+++ b/airflow/api_fastapi/execution_api/routes/variables.py
@@ -22,7 +22,9 @@ import logging
 from fastapi import HTTPException, status
 
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.execution_api import datamodels, deps
+from airflow.api_fastapi.execution_api import deps
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.datamodels.variable import 
VariableResponse
 from airflow.models.variable import Variable
 
 # TODO: Add dependency on JWT token
@@ -40,10 +42,7 @@ log = logging.getLogger(__name__)
         status.HTTP_403_FORBIDDEN: {"description": "Task does not have access 
to the variable"},
     },
 )
-def get_variable(
-    variable_key: str,
-    token: deps.TokenDep,
-) -> datamodels.VariableResponse:
+def get_variable(variable_key: str, token: deps.TokenDep) -> VariableResponse:
     """Get an Airflow Variable."""
     if not has_variable_access(variable_key, token):
         raise HTTPException(
@@ -65,10 +64,10 @@ def get_variable(
             },
         )
 
-    return datamodels.VariableResponse(key=variable_key, value=variable_value)
+    return VariableResponse(key=variable_key, value=variable_value)
 
 
-def has_variable_access(variable_key: str, token: datamodels.TIToken) -> bool:
+def has_variable_access(variable_key: str, token: TIToken) -> bool:
     """Check if the task has access to the variable."""
     # TODO: Placeholder for actual implementation
 
diff --git a/airflow/api_fastapi/execution_api/routes/xcoms.py 
b/airflow/api_fastapi/execution_api/routes/xcoms.py
index 0f0a04ed269..083947923dc 100644
--- a/airflow/api_fastapi/execution_api/routes/xcoms.py
+++ b/airflow/api_fastapi/execution_api/routes/xcoms.py
@@ -26,7 +26,9 @@ from sqlalchemy.orm import Session
 
 from airflow.api_fastapi.common.db.common import get_session
 from airflow.api_fastapi.common.router import AirflowRouter
-from airflow.api_fastapi.execution_api import datamodels, deps
+from airflow.api_fastapi.execution_api import deps
+from airflow.api_fastapi.execution_api.datamodels.token import TIToken
+from airflow.api_fastapi.execution_api.datamodels.xcom import XComResponse
 from airflow.models.xcom import BaseXCom
 
 # TODO: Add dependency on JWT token
@@ -52,7 +54,7 @@ def get_xcom(
     token: deps.TokenDep,
     session: Annotated[Session, Depends(get_session)],
     map_index: Annotated[int, Query()] = -1,
-) -> datamodels.XComResponse:
+) -> XComResponse:
     """Get an Airflow XCom from database - not other XCom Backends."""
     if not has_xcom_access(key, token):
         raise HTTPException(
@@ -100,10 +102,10 @@ def get_xcom(
             },
         )
 
-    return datamodels.XComResponse(key=key, value=xcom_value)
+    return XComResponse(key=key, value=xcom_value)
 
 
-def has_xcom_access(xcom_key: str, token: datamodels.TIToken) -> bool:
+def has_xcom_access(xcom_key: str, token: TIToken) -> bool:
     """Check if the task has access to the XCom."""
     # TODO: Placeholder for actual implementation
 

Reply via email to