kaxil commented on code in PR #67041:
URL: https://github.com/apache/airflow/pull/67041#discussion_r3270191732


##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow._shared.state import TaskScope
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.task_state import (
+    TaskStateBody,
+    TaskStateCollectionResponse,
+    TaskStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.task_state import TaskStateModel
+from airflow.state.metastore import MetastoreStateBackend
+
+task_state_router = AirflowRouter(
+    tags=["Task State"],
+    
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states",
+)
+
+
+def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) -> 
TaskScope:
+    return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id, 
map_index=map_index)
+
+
+@task_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def list_task_states(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateCollectionResponse:
+    """List all task state entries for a task instance."""
+    base = select(
+        TaskStateModel.key,
+        TaskStateModel.value,
+        TaskStateModel.updated_at,
+        TaskStateModel.expires_at,
+    ).where(
+        TaskStateModel.dag_id == dag_id,
+        TaskStateModel.run_id == dag_run_id,
+        TaskStateModel.task_id == task_id,
+        TaskStateModel.map_index == map_index,
+    )
+    paginated, total_entries = paginated_select(
+        statement=base, filters=[], order_by=None, offset=offset, limit=limit, 
session=session

Review Comment:
   `paginated_select` is called with `order_by=None`, so the query has 
`LIMIT/OFFSET` with no `ORDER BY`. Pagination across pages is undefined: rows 
can repeat or vanish between requests. Pick a stable ordering (e.g. 
`TaskStateModel.key` or `(updated_at, key)`) and pass it through `order_by`.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow._shared.state import TaskScope
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.task_state import (
+    TaskStateBody,
+    TaskStateCollectionResponse,
+    TaskStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.task_state import TaskStateModel
+from airflow.state.metastore import MetastoreStateBackend
+
+task_state_router = AirflowRouter(
+    tags=["Task State"],
+    
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states",
+)
+
+
+def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) -> 
TaskScope:
+    return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id, 
map_index=map_index)
+
+
+@task_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def list_task_states(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateCollectionResponse:
+    """List all task state entries for a task instance."""
+    base = select(
+        TaskStateModel.key,
+        TaskStateModel.value,
+        TaskStateModel.updated_at,
+        TaskStateModel.expires_at,
+    ).where(
+        TaskStateModel.dag_id == dag_id,
+        TaskStateModel.run_id == dag_run_id,
+        TaskStateModel.task_id == task_id,
+        TaskStateModel.map_index == map_index,
+    )
+    paginated, total_entries = paginated_select(
+        statement=base, filters=[], order_by=None, offset=offset, limit=limit, 
session=session
+    )
+    rows = session.execute(paginated).all()
+    entries = [
+        TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, 
expires_at=r.expires_at)
+        for r in rows
+    ]
+    return TaskStateCollectionResponse(task_states=entries, 
total_entries=total_entries)
+
+
+@task_state_router.get(
+    "/{key}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def get_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateResponse:
+    """Get a single task state entry."""
+    row = session.execute(
+        select(
+            TaskStateModel.key,
+            TaskStateModel.value,
+            TaskStateModel.updated_at,
+            TaskStateModel.expires_at,
+        ).where(
+            TaskStateModel.dag_id == dag_id,
+            TaskStateModel.run_id == dag_run_id,
+            TaskStateModel.task_id == task_id,
+            TaskStateModel.map_index == map_index,
+            TaskStateModel.key == key,
+        )
+    ).one_or_none()
+    if row is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Task state key {key!r} not found",
+        )
+    return TaskStateResponse(
+        key=row.key, value=row.value, updated_at=row.updated_at, 
expires_at=row.expires_at
+    )
+
+
+@task_state_router.put(
+    "/{key}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def set_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    body: TaskStateBody,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+    """Set a task state value. Creates or overwrites the key."""
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    try:
+        MetastoreStateBackend().set(scope, key, body.value, session=session)

Review Comment:
   These routes hardcode `MetastoreStateBackend()` instead of going through 
`airflow.state.get_state_backend()`. If an operator configures `[state_store] 
backend = my_custom.Backend`, the SDK / scheduler use the custom backend but 
this API still writes to the metastore -- the two sides drift apart silently. 
Same issue at lines 163 and 182 here, and in `asset_state.py` at 109/124/138. 
Consider resolving the backend once via `get_state_backend()`.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow._shared.state import TaskScope
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.task_state import (
+    TaskStateBody,
+    TaskStateCollectionResponse,
+    TaskStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.task_state import TaskStateModel
+from airflow.state.metastore import MetastoreStateBackend
+
+task_state_router = AirflowRouter(
+    tags=["Task State"],
+    
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states",
+)
+
+
+def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) -> 
TaskScope:
+    return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id, 
map_index=map_index)
+
+
+@task_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def list_task_states(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateCollectionResponse:
+    """List all task state entries for a task instance."""
+    base = select(
+        TaskStateModel.key,
+        TaskStateModel.value,
+        TaskStateModel.updated_at,
+        TaskStateModel.expires_at,
+    ).where(
+        TaskStateModel.dag_id == dag_id,
+        TaskStateModel.run_id == dag_run_id,
+        TaskStateModel.task_id == task_id,
+        TaskStateModel.map_index == map_index,
+    )
+    paginated, total_entries = paginated_select(
+        statement=base, filters=[], order_by=None, offset=offset, limit=limit, 
session=session
+    )
+    rows = session.execute(paginated).all()
+    entries = [
+        TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, 
expires_at=r.expires_at)
+        for r in rows
+    ]
+    return TaskStateCollectionResponse(task_states=entries, 
total_entries=total_entries)
+
+
+@task_state_router.get(
+    "/{key}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def get_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateResponse:
+    """Get a single task state entry."""
+    row = session.execute(
+        select(
+            TaskStateModel.key,
+            TaskStateModel.value,
+            TaskStateModel.updated_at,
+            TaskStateModel.expires_at,
+        ).where(
+            TaskStateModel.dag_id == dag_id,
+            TaskStateModel.run_id == dag_run_id,
+            TaskStateModel.task_id == task_id,
+            TaskStateModel.map_index == map_index,
+            TaskStateModel.key == key,
+        )
+    ).one_or_none()
+    if row is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Task state key {key!r} not found",
+        )
+    return TaskStateResponse(
+        key=row.key, value=row.value, updated_at=row.updated_at, 
expires_at=row.expires_at
+    )
+
+
+@task_state_router.put(
+    "/{key}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def set_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    body: TaskStateBody,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+    """Set a task state value. Creates or overwrites the key."""
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    try:
+        MetastoreStateBackend().set(scope, key, body.value, session=session)

Review Comment:
   `set_task_state` validates the DagRun (via `_set_task_state` raising 
`ValueError`) but never checks `task_id` against the DAG. A typo'd `task_id` 
happily creates an orphan row that nothing else owns. XCom's create path calls 
`dag.get_task(task_id)` and 404s. Worth doing the same here so callers can't 
silently poison the table.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow._shared.state import TaskScope
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.task_state import (
+    TaskStateBody,
+    TaskStateCollectionResponse,
+    TaskStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.task_state import TaskStateModel
+from airflow.state.metastore import MetastoreStateBackend
+
+task_state_router = AirflowRouter(
+    tags=["Task State"],
+    
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states",
+)
+
+
+def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) -> 
TaskScope:
+    return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id, 
map_index=map_index)
+
+
+@task_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def list_task_states(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateCollectionResponse:
+    """List all task state entries for a task instance."""
+    base = select(
+        TaskStateModel.key,
+        TaskStateModel.value,
+        TaskStateModel.updated_at,
+        TaskStateModel.expires_at,
+    ).where(
+        TaskStateModel.dag_id == dag_id,
+        TaskStateModel.run_id == dag_run_id,
+        TaskStateModel.task_id == task_id,
+        TaskStateModel.map_index == map_index,
+    )
+    paginated, total_entries = paginated_select(
+        statement=base, filters=[], order_by=None, offset=offset, limit=limit, 
session=session
+    )
+    rows = session.execute(paginated).all()
+    entries = [
+        TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, 
expires_at=r.expires_at)
+        for r in rows
+    ]
+    return TaskStateCollectionResponse(task_states=entries, 
total_entries=total_entries)
+
+
+@task_state_router.get(
+    "/{key}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def get_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateResponse:
+    """Get a single task state entry."""
+    row = session.execute(
+        select(
+            TaskStateModel.key,
+            TaskStateModel.value,
+            TaskStateModel.updated_at,
+            TaskStateModel.expires_at,
+        ).where(
+            TaskStateModel.dag_id == dag_id,
+            TaskStateModel.run_id == dag_run_id,
+            TaskStateModel.task_id == task_id,
+            TaskStateModel.map_index == map_index,
+            TaskStateModel.key == key,
+        )
+    ).one_or_none()
+    if row is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Task state key {key!r} not found",
+        )
+    return TaskStateResponse(
+        key=row.key, value=row.value, updated_at=row.updated_at, 
expires_at=row.expires_at
+    )
+
+
+@task_state_router.put(
+    "/{key}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def set_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    body: TaskStateBody,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+    """Set a task state value. Creates or overwrites the key."""
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    try:
+        MetastoreStateBackend().set(scope, key, body.value, session=session)
+    except ValueError as e:
+        raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
detail=str(e)) from e
+
+
+@task_state_router.delete(
+    "/{key}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="DELETE", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def delete_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    key: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+    """Delete a single task state key. No-op if the key does not exist."""
+    scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+    MetastoreStateBackend().delete(scope, key, session=session)
+
+
+@task_state_router.delete(
+    "",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="DELETE", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def clear_task_state(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+    all_map_indices: Annotated[bool, Query()] = False,

Review Comment:
   When `all_map_indices=true`, the `map_index` query param is silently 
ignored. A caller sending `?map_index=3&all_map_indices=true` will not get what 
they expect. Either reject the combination with 422 or make it clear in the 
OpenAPI description that `all_map_indices=true` overrides `map_index`.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException, status
+from sqlalchemy import select
+
+from airflow._shared.state import AssetScope
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.asset_state import (
+    AssetStateBody,
+    AssetStateCollectionResponse,
+    AssetStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_asset
+from airflow.models.asset_state import AssetStateModel
+from airflow.state.metastore import MetastoreStateBackend
+
+asset_state_router = AirflowRouter(
+    tags=["Asset State"],
+    prefix="/assets/{asset_id}/states",
+)
+
+
+@asset_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def list_asset_states(
+    asset_id: int,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+) -> AssetStateCollectionResponse:
+    """List all state entries for an asset."""
+    base = select(
+        AssetStateModel.key,
+        AssetStateModel.value,
+        AssetStateModel.updated_at,
+    ).where(AssetStateModel.asset_id == asset_id)
+    paginated, total_entries = paginated_select(
+        statement=base, filters=[], order_by=None, offset=offset, limit=limit, 
session=session

Review Comment:
   Same `order_by=None` issue as the task state list. With no `ORDER BY`, 
paginating asset states isn't stable. Sort on `AssetStateModel.key` (or 
`updated_at, key`) to give callers a deterministic page sequence.



##########
airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py:
##########
@@ -0,0 +1,43 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+
+
+class TaskStateResponse(BaseModel):
+    """A single task state key/value pair with metadata."""
+
+    key: str
+    value: str
+    updated_at: datetime
+    expires_at: datetime | None
+
+
+class TaskStateCollectionResponse(BaseModel):
+    """All task state entries for a task instance."""
+
+    task_states: list[TaskStateResponse]
+    total_entries: int
+
+
+class TaskStateBody(StrictBaseModel):
+    """Request body for setting a task state value."""
+
+    value: str

Review Comment:
   `value` is an unbounded `str`. There's nothing stopping a client from 
PUT-ing a 100MB string into a single key. The column is `Text`/`MEDIUMTEXT` so 
the DB will store it, but the API should impose a sensible upper bound (e.g. 
`Field(max_length=...)`) before that's a DoS shape. Same applies to 
`AssetStateBody.value`.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py:
##########
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from fastapi import Depends, HTTPException, status
+from sqlalchemy import select
+
+from airflow._shared.state import AssetScope
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.asset_state import (
+    AssetStateBody,
+    AssetStateCollectionResponse,
+    AssetStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_asset
+from airflow.models.asset_state import AssetStateModel
+from airflow.state.metastore import MetastoreStateBackend
+
+asset_state_router = AirflowRouter(
+    tags=["Asset State"],
+    prefix="/assets/{asset_id}/states",
+)
+
+
+@asset_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def list_asset_states(
+    asset_id: int,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+) -> AssetStateCollectionResponse:
+    """List all state entries for an asset."""
+    base = select(
+        AssetStateModel.key,
+        AssetStateModel.value,
+        AssetStateModel.updated_at,
+    ).where(AssetStateModel.asset_id == asset_id)
+    paginated, total_entries = paginated_select(
+        statement=base, filters=[], order_by=None, offset=offset, limit=limit, 
session=session
+    )
+    rows = session.execute(paginated).all()
+    entries = [AssetStateResponse(key=r.key, value=r.value, 
updated_at=r.updated_at) for r in rows]
+    return AssetStateCollectionResponse(asset_states=entries, 
total_entries=total_entries)
+
+
+@asset_state_router.get(
+    "/{key}",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def get_asset_state(
+    asset_id: int,
+    key: str,
+    session: SessionDep,
+) -> AssetStateResponse:
+    """Get a single asset state entry."""
+    row = session.execute(
+        select(
+            AssetStateModel.key,
+            AssetStateModel.value,
+            AssetStateModel.updated_at,
+        ).where(
+            AssetStateModel.asset_id == asset_id,
+            AssetStateModel.key == key,
+        )
+    ).one_or_none()
+    if row is None:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Asset state key {key!r} not found",
+        )
+    return AssetStateResponse(key=row.key, value=row.value, 
updated_at=row.updated_at)
+
+
+@asset_state_router.put(
+    "/{key}",
+    status_code=status.HTTP_204_NO_CONTENT,
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_asset(method="PUT"))],
+)
+def set_asset_state(
+    asset_id: int,
+    key: str,
+    body: AssetStateBody,
+    session: SessionDep,
+) -> None:
+    """Set an asset state value. Creates or overwrites the key."""
+    MetastoreStateBackend().set(AssetScope(asset_id=asset_id), key, 
body.value, session=session)

Review Comment:
   None of the asset endpoints check that `asset_id` actually exists. `set` 
will hit the FK and return 500 on non-existent IDs; `delete`/`clear` silently 
204; `get`/`list` return empty/404 with no way to tell "asset missing" from 
"asset has no state". A `SELECT 1 FROM asset WHERE id = :asset_id` (or a 
`get_asset_or_404` helper) at the top of each route would fix the 500 and make 
the contract consistent.



##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py:
##########
@@ -0,0 +1,182 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow._shared.state import TaskScope
+from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.task_state import (
+    TaskStateBody,
+    TaskStateCollectionResponse,
+    TaskStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.task_state import TaskStateModel
+from airflow.state.metastore import MetastoreStateBackend
+
+task_state_router = AirflowRouter(
+    tags=["Task State"],
+    
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states",
+)
+
+
+def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) -> 
TaskScope:
+    return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id, 
map_index=map_index)
+
+
+@task_state_router.get(
+    "",
+    responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+    dependencies=[Depends(requires_access_dag(method="GET", 
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def list_task_states(
+    dag_id: str,
+    dag_run_id: str,
+    task_id: str,
+    limit: QueryLimit,
+    offset: QueryOffset,
+    session: SessionDep,
+    map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateCollectionResponse:
+    """List all task state entries for a task instance."""
+    base = select(
+        TaskStateModel.key,
+        TaskStateModel.value,
+        TaskStateModel.updated_at,
+        TaskStateModel.expires_at,
+    ).where(
+        TaskStateModel.dag_id == dag_id,
+        TaskStateModel.run_id == dag_run_id,
+        TaskStateModel.task_id == task_id,
+        TaskStateModel.map_index == map_index,
+    )
+    paginated, total_entries = paginated_select(
+        statement=base, filters=[], order_by=None, offset=offset, limit=limit, 
session=session
+    )
+    rows = session.execute(paginated).all()
+    entries = [
+        TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, 
expires_at=r.expires_at)
+        for r in rows
+    ]
+    return TaskStateCollectionResponse(task_states=entries, 
total_entries=total_entries)
+
+
+@task_state_router.get(
+    "/{key}",

Review Comment:
   The `{key}` path param doesn't match slashes, so a key like 
`workflow/step_1` will 404 even though the backend would happily store it. XCom 
uses `{xcom_key:path}` for exactly this reason. Either declare the key as 
`:path` or document/enforce the restriction in `TaskStateBody`/server-side 
validation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to