This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new f9e4e6efd10 AIP-103: Add Core API endpoints for task state and asset
state (#67041)
f9e4e6efd10 is described below
commit f9e4e6efd10bcf99cabf5455b50574a1b64078c5
Author: Amogh Desai <[email protected]>
AuthorDate: Wed May 20 16:18:21 2026 +0530
AIP-103: Add Core API endpoints for task state and asset state (#67041)
---
.../api_fastapi/core_api/datamodels/asset_state.py | 44 ++
.../api_fastapi/core_api/datamodels/task_state.py | 45 ++
.../core_api/openapi/v2-rest-api-generated.yaml | 744 +++++++++++++++++++++
.../api_fastapi/core_api/routes/public/__init__.py | 4 +
.../core_api/routes/public/asset_state.py | 163 +++++
.../core_api/routes/public/task_state.py | 210 ++++++
.../src/airflow/ui/openapi-gen/queries/common.ts | 44 +-
.../ui/openapi-gen/queries/ensureQueryData.ts | 70 +-
.../src/airflow/ui/openapi-gen/queries/prefetch.ts | 70 +-
.../src/airflow/ui/openapi-gen/queries/queries.ts | 201 +++++-
.../src/airflow/ui/openapi-gen/queries/suspense.ts | 70 +-
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 126 ++++
.../ui/openapi-gen/requests/services.gen.ts | 321 ++++++++-
.../airflow/ui/openapi-gen/requests/types.gen.ts | 395 +++++++++++
.../core_api/routes/public/test_asset_state.py | 255 +++++++
.../core_api/routes/public/test_task_state.py | 293 ++++++++
.../src/airflowctl/api/datamodels/generated.py | 61 ++
17 files changed, 3109 insertions(+), 7 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
new file mode 100644
index 00000000000..379d7802909
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
@@ -0,0 +1,44 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from pydantic import Field
+
+from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+
+
+class AssetStateResponse(BaseModel):
+ """A single asset state key/value pair with metadata."""
+
+ key: str
+ value: str
+ updated_at: datetime
+
+
+class AssetStateCollectionResponse(BaseModel):
+ """All asset state entries for an asset."""
+
+ asset_states: list[AssetStateResponse]
+ total_entries: int
+
+
+class AssetStateBody(StrictBaseModel):
+ """Request body for setting an asset state value."""
+
+ value: str = Field(max_length=65535)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
new file mode 100644
index 00000000000..856de74a087
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
@@ -0,0 +1,45 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import datetime
+
+from pydantic import Field
+
+from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+
+
+class TaskStateResponse(BaseModel):
+ """A single task state key/value pair with metadata."""
+
+ key: str
+ value: str
+ updated_at: datetime
+ expires_at: datetime | None
+
+
+class TaskStateCollectionResponse(BaseModel):
+ """All task state entries for a task instance."""
+
+ task_states: list[TaskStateResponse]
+ total_entries: int
+
+
+class TaskStateBody(StrictBaseModel):
+ """Request body for setting a task state value."""
+
+ value: str = Field(max_length=65535)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 880860ee801..a4434151f3d 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -5529,6 +5529,649 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
+ /api/v2/assets/{asset_id}/states:
+ get:
+ tags:
+ - Asset State
+ summary: List Asset States
+ description: List all state entries for an asset.
+ operationId: list_asset_states
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: asset_id
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Asset Id
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 50
+ title: Limit
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 0
+ title: Offset
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetStateCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ delete:
+ tags:
+ - Asset State
+ summary: Clear Asset State
+ description: Delete all state keys for an asset.
+ operationId: clear_asset_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: asset_id
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Asset Id
+ responses:
+ '204':
+ description: Successful Response
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /api/v2/assets/{asset_id}/states/{key}:
+ get:
+ tags:
+ - Asset State
+ summary: Get Asset State
+ description: Get a single asset state entry.
+ operationId: get_asset_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Key
+ - name: asset_id
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Asset Id
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetStateResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ put:
+ tags:
+ - Asset State
+ summary: Set Asset State
+ description: Set an asset state value. Creates or overwrites the key.
+ operationId: set_asset_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Key
+ - name: asset_id
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Asset Id
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/AssetStateBody'
+ responses:
+ '204':
+ description: Successful Response
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ delete:
+ tags:
+ - Asset State
+ summary: Delete Asset State
+ description: Delete a single asset state key. No-op if the key does not
exist.
+ operationId: delete_asset_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Key
+ - name: asset_id
+ in: path
+ required: true
+ schema:
+ type: integer
+ title: Asset Id
+ responses:
+ '204':
+ description: Successful Response
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states:
+ get:
+ tags:
+ - Task State
+ summary: List Task States
+ description: List all task state entries for a task instance.
+ operationId: list_task_states
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: map_index
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: -1
+ default: -1
+ title: Map Index
+ - name: limit
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 50
+ title: Limit
+ - name: offset
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: 0
+ default: 0
+ title: Offset
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/TaskStateCollectionResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ delete:
+ tags:
+ - Task State
+ summary: Clear Task State
+ description: 'Delete all task state keys for a task instance.
+
+
+ When ``all_map_indices=true``, state is cleared for every map index of
the
+ task and
+
+ the ``map_index`` parameter is ignored.'
+ operationId: clear_task_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: map_index
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: -1
+ default: -1
+ title: Map Index
+ - name: all_map_indices
+ in: query
+ required: false
+ schema:
+ type: boolean
+ default: false
+ title: All Map Indices
+ responses:
+ '204':
+ description: Successful Response
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}:
+ get:
+ tags:
+ - Task State
+ summary: Get Task State
+ description: Get a single task state entry.
+ operationId: get_task_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Key
+ - name: map_index
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: -1
+ default: -1
+ title: Map Index
+ responses:
+ '200':
+ description: Successful Response
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/TaskStateResponse'
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ put:
+ tags:
+ - Task State
+ summary: Set Task State
+ description: Set a task state value. Creates or overwrites the key.
+ operationId: set_task_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Key
+ - name: map_index
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: -1
+ default: -1
+ title: Map Index
+ requestBody:
+ required: true
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/TaskStateBody'
+ responses:
+ '204':
+ description: Successful Response
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
+ delete:
+ tags:
+ - Task State
+ summary: Delete Task State
+ description: Delete a single task state key. No-op if the key does not
exist.
+ operationId: delete_task_state
+ security:
+ - OAuth2PasswordBearer: []
+ - HTTPBearer: []
+ parameters:
+ - name: dag_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Id
+ - name: dag_run_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Dag Run Id
+ - name: task_id
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Task Id
+ - name: key
+ in: path
+ required: true
+ schema:
+ type: string
+ title: Key
+ - name: map_index
+ in: query
+ required: false
+ schema:
+ type: integer
+ minimum: -1
+ default: -1
+ title: Map Index
+ responses:
+ '204':
+ description: Successful Response
+ '401':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Unauthorized
+ '403':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Forbidden
+ '404':
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPExceptionResponse'
+ description: Not Found
+ '422':
+ description: Validation Error
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/HTTPValidationError'
/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}:
get:
tags:
@@ -10667,6 +11310,53 @@ components:
- watchers
title: AssetResponse
description: Asset serializer for responses.
+ AssetStateBody:
+ properties:
+ value:
+ type: string
+ maxLength: 65535
+ title: Value
+ additionalProperties: false
+ type: object
+ required:
+ - value
+ title: AssetStateBody
+ description: Request body for setting an asset state value.
+ AssetStateCollectionResponse:
+ properties:
+ asset_states:
+ items:
+ $ref: '#/components/schemas/AssetStateResponse'
+ type: array
+ title: Asset States
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - asset_states
+ - total_entries
+ title: AssetStateCollectionResponse
+ description: All asset state entries for an asset.
+ AssetStateResponse:
+ properties:
+ key:
+ type: string
+ title: Key
+ value:
+ type: string
+ title: Value
+ updated_at:
+ type: string
+ format: date-time
+ title: Updated At
+ type: object
+ required:
+ - key
+ - value
+ - updated_at
+ title: AssetStateResponse
+ description: A single asset state key/value pair with metadata.
AssetWatcherResponse:
properties:
name:
@@ -14945,6 +15635,60 @@ components:
- extra_links
title: TaskResponse
description: Task serializer for responses.
+ TaskStateBody:
+ properties:
+ value:
+ type: string
+ maxLength: 65535
+ title: Value
+ additionalProperties: false
+ type: object
+ required:
+ - value
+ title: TaskStateBody
+ description: Request body for setting a task state value.
+ TaskStateCollectionResponse:
+ properties:
+ task_states:
+ items:
+ $ref: '#/components/schemas/TaskStateResponse'
+ type: array
+ title: Task States
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - task_states
+ - total_entries
+ title: TaskStateCollectionResponse
+ description: All task state entries for a task instance.
+ TaskStateResponse:
+ properties:
+ key:
+ type: string
+ title: Key
+ value:
+ type: string
+ title: Value
+ updated_at:
+ type: string
+ format: date-time
+ title: Updated At
+ expires_at:
+ anyOf:
+ - type: string
+ format: date-time
+ - type: 'null'
+ title: Expires At
+ type: object
+ required:
+ - key
+ - value
+ - updated_at
+ - expires_at
+ title: TaskStateResponse
+ description: A single task state key/value pair with metadata.
TimeDelta:
properties:
__type:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
index b590424de4e..4e6ebba1178 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/__init__.py
@@ -21,6 +21,7 @@ from fastapi import Depends, status
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.routes.public.asset_state import
asset_state_router
from airflow.api_fastapi.core_api.routes.public.assets import assets_router
from airflow.api_fastapi.core_api.routes.public.auth import auth_router
from airflow.api_fastapi.core_api.routes.public.backfills import
backfills_router
@@ -45,6 +46,7 @@ from airflow.api_fastapi.core_api.routes.public.plugins
import plugins_router
from airflow.api_fastapi.core_api.routes.public.pools import pools_router
from airflow.api_fastapi.core_api.routes.public.providers import
providers_router
from airflow.api_fastapi.core_api.routes.public.task_instances import
task_instances_router
+from airflow.api_fastapi.core_api.routes.public.task_state import
task_state_router
from airflow.api_fastapi.core_api.routes.public.tasks import tasks_router
from airflow.api_fastapi.core_api.routes.public.variables import
variables_router
from airflow.api_fastapi.core_api.routes.public.version import version_router
@@ -79,6 +81,8 @@ authenticated_router.include_router(job_router)
authenticated_router.include_router(plugins_router)
authenticated_router.include_router(pools_router)
authenticated_router.include_router(providers_router)
+authenticated_router.include_router(asset_state_router)
+authenticated_router.include_router(task_state_router)
authenticated_router.include_router(xcom_router)
authenticated_router.include_router(task_instances_router)
authenticated_router.include_router(tasks_router)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
new file mode 100644
index 00000000000..43580e0c762
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
@@ -0,0 +1,163 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, status
+from sqlalchemy import select
+
+from airflow._shared.state import AssetScope
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.asset_state import (
+ AssetStateBody,
+ AssetStateCollectionResponse,
+ AssetStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_asset
+from airflow.models.asset import AssetModel
+from airflow.models.asset_state import AssetStateModel
+from airflow.state import get_state_backend
+
+asset_state_router = AirflowRouter(
+ tags=["Asset State"],
+ prefix="/assets/{asset_id}/states",
+)
+
+
+def _get_asset_or_404(asset_id: int, session: SessionDep) -> int:
+ exists = session.scalar(select(AssetModel.id).where(AssetModel.id ==
asset_id))
+ if exists is None:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Asset with id {asset_id!r} not found",
+ )
+ return asset_id
+
+
+AssetIdDep = Annotated[int, Depends(_get_asset_or_404)]
+
+
+@asset_state_router.get(
+ "",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def list_asset_states(
+ asset_id: AssetIdDep,
+ limit: QueryLimit,
+ offset: QueryOffset,
+ session: SessionDep,
+) -> AssetStateCollectionResponse:
+ """List all state entries for an asset."""
+ base = (
+ select(
+ AssetStateModel.key,
+ AssetStateModel.value,
+ AssetStateModel.updated_at,
+ )
+ .where(AssetStateModel.asset_id == asset_id)
+ .order_by(AssetStateModel.key.asc())
+ )
+ paginated, total_entries = paginated_select(
+ statement=base,
+ filters=None,
+ order_by=None,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
+ rows = session.execute(paginated).all()
+ entries = [AssetStateResponse(key=r.key, value=r.value,
updated_at=r.updated_at) for r in rows]
+ return AssetStateCollectionResponse(asset_states=entries,
total_entries=total_entries)
+
+
+@asset_state_router.get(
+ "/{key:path}",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_asset(method="GET"))],
+)
+def get_asset_state(
+ asset_id: AssetIdDep,
+ key: str,
+ session: SessionDep,
+) -> AssetStateResponse:
+ """Get a single asset state entry."""
+ row = session.execute(
+ select(
+ AssetStateModel.key,
+ AssetStateModel.value,
+ AssetStateModel.updated_at,
+ ).where(
+ AssetStateModel.asset_id == asset_id,
+ AssetStateModel.key == key,
+ )
+ ).one_or_none()
+ if row is None:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Asset state key {key!r} not found",
+ )
+ return AssetStateResponse(key=row.key, value=row.value,
updated_at=row.updated_at)
+
+
+@asset_state_router.put(
+ "/{key:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_asset(method="PUT"))],
+)
+def set_asset_state(
+ asset_id: AssetIdDep,
+ key: str,
+ body: AssetStateBody,
+ session: SessionDep,
+) -> None:
+ """Set an asset state value. Creates or overwrites the key."""
+ get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value,
session=session)
+
+
+@asset_state_router.delete(
+ "/{key:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_asset(method="DELETE"))],
+)
+def delete_asset_state(
+ asset_id: AssetIdDep,
+ key: str,
+ session: SessionDep,
+) -> None:
+ """Delete a single asset state key. No-op if the key does not exist."""
+ get_state_backend().delete(AssetScope(asset_id=asset_id), key,
session=session)
+
+
+@asset_state_router.delete(
+ "",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_asset(method="DELETE"))],
+)
+def clear_asset_state(
+ asset_id: AssetIdDep,
+ session: SessionDep,
+) -> None:
+ """Delete all state keys for an asset."""
+ get_state_backend().clear(AssetScope(asset_id=asset_id), session=session)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
new file mode 100644
index 00000000000..138380232a8
--- /dev/null
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
@@ -0,0 +1,210 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from typing import Annotated
+
+from fastapi import Depends, HTTPException, Query, status
+from sqlalchemy import select
+
+from airflow._shared.state import TaskScope
+from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity
+from airflow.api_fastapi.common.db.common import SessionDep, paginated_select
+from airflow.api_fastapi.common.parameters import QueryLimit, QueryOffset
+from airflow.api_fastapi.common.router import AirflowRouter
+from airflow.api_fastapi.core_api.datamodels.task_state import (
+ TaskStateBody,
+ TaskStateCollectionResponse,
+ TaskStateResponse,
+)
+from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
+from airflow.api_fastapi.core_api.security import requires_access_dag
+from airflow.models.task_state import TaskStateModel
+from airflow.models.taskinstance import TaskInstance as TI
+from airflow.state import get_state_backend
+
+task_state_router = AirflowRouter(
+ tags=["Task State"],
+
prefix="/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states",
+)
+
+
+def _get_scope(dag_id: str, dag_run_id: str, task_id: str, map_index: int) ->
TaskScope:
+ return TaskScope(dag_id=dag_id, run_id=dag_run_id, task_id=task_id,
map_index=map_index)
+
+
+@task_state_router.get(
+ "",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def list_task_states(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ limit: QueryLimit,
+ offset: QueryOffset,
+ session: SessionDep,
+ map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateCollectionResponse:
+ """List all task state entries for a task instance."""
+ base = (
+ select(
+ TaskStateModel.key,
+ TaskStateModel.value,
+ TaskStateModel.updated_at,
+ TaskStateModel.expires_at,
+ )
+ .where(
+ TaskStateModel.dag_id == dag_id,
+ TaskStateModel.run_id == dag_run_id,
+ TaskStateModel.task_id == task_id,
+ TaskStateModel.map_index == map_index,
+ )
+ .order_by(TaskStateModel.key.asc())
+ )
+ paginated, total_entries = paginated_select(
+ statement=base,
+ filters=None,
+ order_by=None,
+ offset=offset,
+ limit=limit,
+ session=session,
+ )
+ rows = session.execute(paginated).all()
+ entries = [
+ TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at,
expires_at=r.expires_at)
+ for r in rows
+ ]
+ return TaskStateCollectionResponse(task_states=entries,
total_entries=total_entries)
+
+
+@task_state_router.get(
+ "/{key:path}",
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_dag(method="GET",
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def get_task_state(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ session: SessionDep,
+ map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> TaskStateResponse:
+ """Get a single task state entry."""
+ row = session.execute(
+ select(
+ TaskStateModel.key,
+ TaskStateModel.value,
+ TaskStateModel.updated_at,
+ TaskStateModel.expires_at,
+ ).where(
+ TaskStateModel.dag_id == dag_id,
+ TaskStateModel.run_id == dag_run_id,
+ TaskStateModel.task_id == task_id,
+ TaskStateModel.map_index == map_index,
+ TaskStateModel.key == key,
+ )
+ ).one_or_none()
+ if row is None:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Task state key {key!r} not found",
+ )
+ return TaskStateResponse(
+ key=row.key, value=row.value, updated_at=row.updated_at,
expires_at=row.expires_at
+ )
+
+
+@task_state_router.put(
+ "/{key:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def set_task_state(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ body: TaskStateBody,
+ session: SessionDep,
+ map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+ """Set a task state value. Creates or overwrites the key."""
+ ti_exists = session.scalar(
+ select(TI.task_id).where(
+ TI.dag_id == dag_id,
+ TI.run_id == dag_run_id,
+ TI.task_id == task_id,
+ TI.map_index == map_index,
+ )
+ )
+ if ti_exists is None:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Task instance not found for dag_id={dag_id!r},
run_id={dag_run_id!r}, task_id={task_id!r}, map_index={map_index}",
+ )
+ scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+ try:
+ get_state_backend().set(scope, key, body.value, session=session)
+ except ValueError as e:
+ raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=str(e)) from e
+
+
+@task_state_router.delete(
+ "/{key:path}",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_dag(method="DELETE",
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def delete_task_state(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ key: str,
+ session: SessionDep,
+ map_index: Annotated[int, Query(ge=-1)] = -1,
+) -> None:
+ """Delete a single task state key. No-op if the key does not exist."""
+ scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+ get_state_backend().delete(scope, key, session=session)
+
+
+@task_state_router.delete(
+ "",
+ status_code=status.HTTP_204_NO_CONTENT,
+ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]),
+ dependencies=[Depends(requires_access_dag(method="DELETE",
access_entity=DagAccessEntity.TASK_INSTANCE))],
+)
+def clear_task_state(
+ dag_id: str,
+ dag_run_id: str,
+ task_id: str,
+ session: SessionDep,
+ map_index: Annotated[int, Query(ge=-1)] = -1,
+ all_map_indices: Annotated[bool, Query()] = False,
+) -> None:
+ """
+ Delete all task state keys for a task instance.
+
+ When ``all_map_indices=true``, state is cleared for every map index of the
task and
+ the ``map_index`` parameter is ignored.
+ """
+ scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
+ get_state_backend().clear(scope, all_map_indices=all_map_indices,
session=session)
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
index 127b09e8761..812a41d9d08 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/common.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseQueryResult } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService,
CalendarService, ConfigService, ConnectionService, DagParsingService,
DagRunService, DagService, DagSourceService, DagStatsService,
DagVersionService, DagWarningService, DashboardService, DeadlinesService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GanttService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PartitionedDagRunService, PluginService, P [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
export type AssetServiceGetAssetsDefaultResponse = Awaited<ReturnType<typeof
AssetService.getAssets>>;
export type AssetServiceGetAssetsQueryResult<TData =
AssetServiceGetAssetsDefaultResponse, TError = unknown> = UseQueryResult<TData,
TError>;
@@ -727,6 +727,42 @@ export const UseProviderServiceGetProvidersKeyFn = ({
limit, offset }: {
limit?: number;
offset?: number;
} = {}, queryKey?: Array<unknown>) => [useProviderServiceGetProvidersKey,
...(queryKey ?? [{ limit, offset }])];
+export type AssetStateServiceListAssetStatesDefaultResponse =
Awaited<ReturnType<typeof AssetStateService.listAssetStates>>;
+export type AssetStateServiceListAssetStatesQueryResult<TData =
AssetStateServiceListAssetStatesDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
+export const useAssetStateServiceListAssetStatesKey =
"AssetStateServiceListAssetStates";
+export const UseAssetStateServiceListAssetStatesKeyFn = ({ assetId, limit,
offset }: {
+ assetId: number;
+ limit?: number;
+ offset?: number;
+}, queryKey?: Array<unknown>) => [useAssetStateServiceListAssetStatesKey,
...(queryKey ?? [{ assetId, limit, offset }])];
+export type AssetStateServiceGetAssetStateDefaultResponse =
Awaited<ReturnType<typeof AssetStateService.getAssetState>>;
+export type AssetStateServiceGetAssetStateQueryResult<TData =
AssetStateServiceGetAssetStateDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
+export const useAssetStateServiceGetAssetStateKey =
"AssetStateServiceGetAssetState";
+export const UseAssetStateServiceGetAssetStateKeyFn = ({ assetId, key }: {
+ assetId: number;
+ key: string;
+}, queryKey?: Array<unknown>) => [useAssetStateServiceGetAssetStateKey,
...(queryKey ?? [{ assetId, key }])];
+export type TaskStateServiceListTaskStatesDefaultResponse =
Awaited<ReturnType<typeof TaskStateService.listTaskStates>>;
+export type TaskStateServiceListTaskStatesQueryResult<TData =
TaskStateServiceListTaskStatesDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
+export const useTaskStateServiceListTaskStatesKey =
"TaskStateServiceListTaskStates";
+export const UseTaskStateServiceListTaskStatesKeyFn = ({ dagId, dagRunId,
limit, mapIndex, offset, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+}, queryKey?: Array<unknown>) => [useTaskStateServiceListTaskStatesKey,
...(queryKey ?? [{ dagId, dagRunId, limit, mapIndex, offset, taskId }])];
+export type TaskStateServiceGetTaskStateDefaultResponse =
Awaited<ReturnType<typeof TaskStateService.getTaskState>>;
+export type TaskStateServiceGetTaskStateQueryResult<TData =
TaskStateServiceGetTaskStateDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
+export const useTaskStateServiceGetTaskStateKey =
"TaskStateServiceGetTaskState";
+export const UseTaskStateServiceGetTaskStateKeyFn = ({ dagId, dagRunId, key,
mapIndex, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+}, queryKey?: Array<unknown>) => [useTaskStateServiceGetTaskStateKey,
...(queryKey ?? [{ dagId, dagRunId, key, mapIndex, taskId }])];
export type XcomServiceGetXcomEntryDefaultResponse = Awaited<ReturnType<typeof
XcomService.getXcomEntry>>;
export type XcomServiceGetXcomEntryQueryResult<TData =
XcomServiceGetXcomEntryDefaultResponse, TError = unknown> =
UseQueryResult<TData, TError>;
export const useXcomServiceGetXcomEntryKey = "XcomServiceGetXcomEntry";
@@ -1012,6 +1048,8 @@ export type AuthLinksServiceGenerateTokenMutationResult =
Awaited<ReturnType<typ
export type BackfillServicePauseBackfillMutationResult =
Awaited<ReturnType<typeof BackfillService.pauseBackfill>>;
export type BackfillServiceUnpauseBackfillMutationResult =
Awaited<ReturnType<typeof BackfillService.unpauseBackfill>>;
export type BackfillServiceCancelBackfillMutationResult =
Awaited<ReturnType<typeof BackfillService.cancelBackfill>>;
+export type AssetStateServiceSetAssetStateMutationResult =
Awaited<ReturnType<typeof AssetStateService.setAssetState>>;
+export type TaskStateServiceSetTaskStateMutationResult =
Awaited<ReturnType<typeof TaskStateService.setTaskState>>;
export type DagParsingServiceReparseDagFileMutationResult =
Awaited<ReturnType<typeof DagParsingService.reparseDagFile>>;
export type ConnectionServicePatchConnectionMutationResult =
Awaited<ReturnType<typeof ConnectionService.patchConnection>>;
export type ConnectionServiceBulkConnectionsMutationResult =
Awaited<ReturnType<typeof ConnectionService.bulkConnections>>;
@@ -1039,5 +1077,9 @@ export type DagRunServiceDeleteDagRunMutationResult =
Awaited<ReturnType<typeof
export type DagServiceDeleteDagMutationResult = Awaited<ReturnType<typeof
DagService.deleteDag>>;
export type TaskInstanceServiceDeleteTaskInstanceMutationResult =
Awaited<ReturnType<typeof TaskInstanceService.deleteTaskInstance>>;
export type PoolServiceDeletePoolMutationResult = Awaited<ReturnType<typeof
PoolService.deletePool>>;
+export type AssetStateServiceClearAssetStateMutationResult =
Awaited<ReturnType<typeof AssetStateService.clearAssetState>>;
+export type AssetStateServiceDeleteAssetStateMutationResult =
Awaited<ReturnType<typeof AssetStateService.deleteAssetState>>;
+export type TaskStateServiceClearTaskStateMutationResult =
Awaited<ReturnType<typeof TaskStateService.clearTaskState>>;
+export type TaskStateServiceDeleteTaskStateMutationResult =
Awaited<ReturnType<typeof TaskStateService.deleteTaskState>>;
export type XcomServiceDeleteXcomEntryMutationResult =
Awaited<ReturnType<typeof XcomService.deleteXcomEntry>>;
export type VariableServiceDeleteVariableMutationResult =
Awaited<ReturnType<typeof VariableService.deleteVariable>>;
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
index 835c1c46dbf..32d74fed6e9 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PartitionedDagRunService, PluginService,
PoolService, ProviderService, Structure [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService,
CalendarService, ConfigService, ConnectionService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1476,6 +1476,74 @@ export const ensureUseProviderServiceGetProvidersData =
(queryClient: QueryClien
offset?: number;
} = {}) => queryClient.ensureQueryData({ queryKey:
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }), queryFn: () =>
ProviderService.getProviders({ limit, offset }) });
/**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseAssetStateServiceListAssetStatesData = (queryClient:
QueryClient, { assetId, limit, offset }: {
+ assetId: number;
+ limit?: number;
+ offset?: number;
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }),
queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseAssetStateServiceGetAssetStateData = (queryClient:
QueryClient, { assetId, key }: {
+ assetId: number;
+ key: string;
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }), queryFn: () =>
AssetStateService.getAssetState({ assetId, key }) });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseTaskStateServiceListTaskStatesData = (queryClient:
QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit,
mapIndex, offset, taskId }), queryFn: () => TaskStateService.listTaskStates({
dagId, dagRunId, limit, mapIndex, offset, taskId }) });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const ensureUseTaskStateServiceGetTaskStateData = (queryClient:
QueryClient, { dagId, dagRunId, key, mapIndex, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+}) => queryClient.ensureQueryData({ queryKey:
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex,
taskId }), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key,
mapIndex, taskId }) });
+/**
* Get Xcom Entry
* Get an XCom entry.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
index 06e09a390f4..ae012539160 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { type QueryClient } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PartitionedDagRunService, PluginService,
PoolService, ProviderService, Structure [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService,
CalendarService, ConfigService, ConnectionService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1476,6 +1476,74 @@ export const prefetchUseProviderServiceGetProviders =
(queryClient: QueryClient,
offset?: number;
} = {}) => queryClient.prefetchQuery({ queryKey:
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }), queryFn: () =>
ProviderService.getProviders({ limit, offset }) });
/**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseAssetStateServiceListAssetStates = (queryClient:
QueryClient, { assetId, limit, offset }: {
+ assetId: number;
+ limit?: number;
+ offset?: number;
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset }),
queryFn: () => AssetStateService.listAssetStates({ assetId, limit, offset }) });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseAssetStateServiceGetAssetState = (queryClient:
QueryClient, { assetId, key }: {
+ assetId: number;
+ key: string;
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }), queryFn: () =>
AssetStateService.getAssetState({ assetId, key }) });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseTaskStateServiceListTaskStates = (queryClient:
QueryClient, { dagId, dagRunId, limit, mapIndex, offset, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit,
mapIndex, offset, taskId }), queryFn: () => TaskStateService.listTaskStates({
dagId, dagRunId, limit, mapIndex, offset, taskId }) });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const prefetchUseTaskStateServiceGetTaskState = (queryClient:
QueryClient, { dagId, dagRunId, key, mapIndex, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+}) => queryClient.prefetchQuery({ queryKey:
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex,
taskId }), queryFn: () => TaskStateService.getTaskState({ dagId, dagRunId, key,
mapIndex, taskId }) });
+/**
* Get Xcom Entry
* Get an XCom entry.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
index fa3a1faf0f9..17ee1009ae0 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts
@@ -1,8 +1,8 @@
// generated with @7nohe/[email protected]
import { UseMutationOptions, UseQueryOptions, useMutation, useQuery } from
"@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagParsingService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
-import { BackfillPostBody, BulkBody_BulkTaskInstanceBody_,
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_,
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody,
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState,
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody,
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TriggerDAGRunPostBody,
UpdateHITLDetailPayload, VariableBody, XComCreateBody, XComUpdateBody [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService,
CalendarService, ConfigService, ConnectionService, DagParsingService,
DagRunService, DagService, DagSourceService, DagStatsService,
DagVersionService, DagWarningService, DashboardService, DeadlinesService,
DependenciesService, EventLogService, ExperimentalService, ExtraLinksService,
GanttService, GridService, ImportErrorService, JobService, LoginService,
MonitorService, PartitionedDagRunService, PluginService, P [...]
+import { AssetStateBody, BackfillPostBody, BulkBody_BulkTaskInstanceBody_,
BulkBody_ConnectionBody_, BulkBody_PoolBody_, BulkBody_VariableBody_,
ClearTaskInstancesBody, ConnectionBody, CreateAssetEventsBody, DAGPatchBody,
DAGRunClearBody, DAGRunPatchBody, DAGRunsBatchBody, DagRunState,
DagWarningType, GenerateTokenBody, MaterializeAssetBody, PatchTaskInstanceBody,
PoolBody, PoolPatchBody, TaskInstancesBatchBody, TaskStateBody,
TriggerDAGRunPostBody, UpdateHITLDetailPayload, VariableBody, [...]
import * as Common from "./common";
/**
* Get Assets
@@ -1476,6 +1476,74 @@ export const useProviderServiceGetProviders = <TData =
Common.ProviderServiceGet
offset?: number;
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }, queryKey),
queryFn: () => ProviderService.getProviders({ limit, offset }) as TData,
...options });
/**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceListAssetStates = <TData =
Common.AssetStateServiceListAssetStatesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ assetId, limit, offset }: {
+ assetId: number;
+ limit?: number;
+ offset?: number;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset },
queryKey), queryFn: () => AssetStateService.listAssetStates({ assetId, limit,
offset }) as TData, ...options });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceGetAssetState = <TData =
Common.AssetStateServiceGetAssetStateDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ assetId, key }: {
+ assetId: number;
+ key: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }, queryKey),
queryFn: () => AssetStateService.getAssetState({ assetId, key }) as TData,
...options });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceListTaskStates = <TData =
Common.TaskStateServiceListTaskStatesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, limit,
mapIndex, offset, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit,
mapIndex, offset, taskId }, queryKey), queryFn: () =>
TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset,
taskId }) as TData, ...options });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceGetTaskState = <TData =
Common.TaskStateServiceGetTaskStateDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ dagId, dagRunId, key, mapIndex, taskId }:
{
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey:
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex,
taskId }, queryKey), queryFn: () => TaskStateService.getTaskState({ dagId,
dagRunId, key, mapIndex, taskId }) as TData, ...options });
+/**
* Get Xcom Entry
* Get an XCom entry.
* @param data The data for the request.
@@ -2300,6 +2368,53 @@ export const useBackfillServiceCancelBackfill = <TData =
Common.BackfillServiceC
backfillId: number;
}, TContext>({ mutationFn: ({ backfillId }) =>
BackfillService.cancelBackfill({ backfillId }) as unknown as Promise<TData>,
...options });
/**
+* Set Asset State
+* Set an asset state value. Creates or overwrites the key.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @param data.requestBody
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceSetAssetState = <TData =
Common.AssetStateServiceSetAssetStateMutationResult, TError = unknown, TContext
= unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ assetId: number;
+ key: string;
+ requestBody: AssetStateBody;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ assetId: number;
+ key: string;
+ requestBody: AssetStateBody;
+}, TContext>({ mutationFn: ({ assetId, key, requestBody }) =>
AssetStateService.setAssetState({ assetId, key, requestBody }) as unknown as
Promise<TData>, ...options });
+/**
+* Set Task State
+* Set a task state value. Creates or overwrites the key.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.requestBody
+* @param data.mapIndex
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceSetTaskState = <TData =
Common.TaskStateServiceSetTaskStateMutationResult, TError = unknown, TContext =
unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ requestBody: TaskStateBody;
+ taskId: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ requestBody: TaskStateBody;
+ taskId: string;
+}, TContext>({ mutationFn: ({ dagId, dagRunId, key, mapIndex, requestBody,
taskId }) => TaskStateService.setTaskState({ dagId, dagRunId, key, mapIndex,
requestBody, taskId }) as unknown as Promise<TData>, ...options });
+/**
* Reparse Dag File
* Request re-parsing a Dag file.
* @param data The data for the request.
@@ -2854,6 +2969,88 @@ export const usePoolServiceDeletePool = <TData =
Common.PoolServiceDeletePoolMut
poolName: string;
}, TContext>({ mutationFn: ({ poolName }) => PoolService.deletePool({ poolName
}) as unknown as Promise<TData>, ...options });
/**
+* Clear Asset State
+* Delete all state keys for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceClearAssetState = <TData =
Common.AssetStateServiceClearAssetStateMutationResult, TError = unknown,
TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ assetId: number;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ assetId: number;
+}, TContext>({ mutationFn: ({ assetId }) =>
AssetStateService.clearAssetState({ assetId }) as unknown as Promise<TData>,
...options });
+/**
+* Delete Asset State
+* Delete a single asset state key. No-op if the key does not exist.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceDeleteAssetState = <TData =
Common.AssetStateServiceDeleteAssetStateMutationResult, TError = unknown,
TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ assetId: number;
+ key: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ assetId: number;
+ key: string;
+}, TContext>({ mutationFn: ({ assetId, key }) =>
AssetStateService.deleteAssetState({ assetId, key }) as unknown as
Promise<TData>, ...options });
+/**
+* Clear Task State
+* Delete all task state keys for a task instance.
+*
+* When ``all_map_indices=true``, state is cleared for every map index of the
task and
+* the ``map_index`` parameter is ignored.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.allMapIndices
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceClearTaskState = <TData =
Common.TaskStateServiceClearTaskStateMutationResult, TError = unknown, TContext
= unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ allMapIndices?: boolean;
+ dagId: string;
+ dagRunId: string;
+ mapIndex?: number;
+ taskId: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ allMapIndices?: boolean;
+ dagId: string;
+ dagRunId: string;
+ mapIndex?: number;
+ taskId: string;
+}, TContext>({ mutationFn: ({ allMapIndices, dagId, dagRunId, mapIndex, taskId
}) => TaskStateService.clearTaskState({ allMapIndices, dagId, dagRunId,
mapIndex, taskId }) as unknown as Promise<TData>, ...options });
+/**
+* Delete Task State
+* Delete a single task state key. No-op if the key does not exist.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns void Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceDeleteTaskState = <TData =
Common.TaskStateServiceDeleteTaskStateMutationResult, TError = unknown,
TContext = unknown>(options?: Omit<UseMutationOptions<TData, TError, {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+}, TContext>, "mutationFn">) => useMutation<TData, TError, {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+}, TContext>({ mutationFn: ({ dagId, dagRunId, key, mapIndex, taskId }) =>
TaskStateService.deleteTaskState({ dagId, dagRunId, key, mapIndex, taskId }) as
unknown as Promise<TData>, ...options });
+/**
* Delete Xcom Entry
* Delete an XCom entry.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
index 34fa5aee502..8cdfee041b6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts
@@ -1,7 +1,7 @@
// generated with @7nohe/[email protected]
import { UseQueryOptions, useSuspenseQuery } from "@tanstack/react-query";
-import { AssetService, AuthLinksService, BackfillService, CalendarService,
ConfigService, ConnectionService, DagRunService, DagService, DagSourceService,
DagStatsService, DagVersionService, DagWarningService, DashboardService,
DeadlinesService, DependenciesService, EventLogService, ExperimentalService,
ExtraLinksService, GanttService, GridService, ImportErrorService, JobService,
LoginService, MonitorService, PartitionedDagRunService, PluginService,
PoolService, ProviderService, Structure [...]
+import { AssetService, AssetStateService, AuthLinksService, BackfillService,
CalendarService, ConfigService, ConnectionService, DagRunService, DagService,
DagSourceService, DagStatsService, DagVersionService, DagWarningService,
DashboardService, DeadlinesService, DependenciesService, EventLogService,
ExperimentalService, ExtraLinksService, GanttService, GridService,
ImportErrorService, JobService, LoginService, MonitorService,
PartitionedDagRunService, PluginService, PoolService, Provide [...]
import { DagRunState, DagWarningType } from "../requests/types.gen";
import * as Common from "./common";
/**
@@ -1476,6 +1476,74 @@ export const useProviderServiceGetProvidersSuspense =
<TData = Common.ProviderSe
offset?: number;
} = {}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseProviderServiceGetProvidersKeyFn({ limit, offset }, queryKey),
queryFn: () => ProviderService.getProviders({ limit, offset }) as TData,
...options });
/**
+* List Asset States
+* List all state entries for an asset.
+* @param data The data for the request.
+* @param data.assetId
+* @param data.limit
+* @param data.offset
+* @returns AssetStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceListAssetStatesSuspense = <TData =
Common.AssetStateServiceListAssetStatesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ assetId, limit, offset }: {
+ assetId: number;
+ limit?: number;
+ offset?: number;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseAssetStateServiceListAssetStatesKeyFn({ assetId, limit, offset },
queryKey), queryFn: () => AssetStateService.listAssetStates({ assetId, limit,
offset }) as TData, ...options });
+/**
+* Get Asset State
+* Get a single asset state entry.
+* @param data The data for the request.
+* @param data.key
+* @param data.assetId
+* @returns AssetStateResponse Successful Response
+* @throws ApiError
+*/
+export const useAssetStateServiceGetAssetStateSuspense = <TData =
Common.AssetStateServiceGetAssetStateDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ assetId, key }: {
+ assetId: number;
+ key: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseAssetStateServiceGetAssetStateKeyFn({ assetId, key }, queryKey),
queryFn: () => AssetStateService.getAssetState({ assetId, key }) as TData,
...options });
+/**
+* List Task States
+* List all task state entries for a task instance.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.mapIndex
+* @param data.limit
+* @param data.offset
+* @returns TaskStateCollectionResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceListTaskStatesSuspense = <TData =
Common.TaskStateServiceListTaskStatesDefaultResponse, TError = unknown,
TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, limit,
mapIndex, offset, taskId }: {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseTaskStateServiceListTaskStatesKeyFn({ dagId, dagRunId, limit,
mapIndex, offset, taskId }, queryKey), queryFn: () =>
TaskStateService.listTaskStates({ dagId, dagRunId, limit, mapIndex, offset,
taskId }) as TData, ...options });
+/**
+* Get Task State
+* Get a single task state entry.
+* @param data The data for the request.
+* @param data.dagId
+* @param data.dagRunId
+* @param data.taskId
+* @param data.key
+* @param data.mapIndex
+* @returns TaskStateResponse Successful Response
+* @throws ApiError
+*/
+export const useTaskStateServiceGetTaskStateSuspense = <TData =
Common.TaskStateServiceGetTaskStateDefaultResponse, TError = unknown, TQueryKey
extends Array<unknown> = unknown[]>({ dagId, dagRunId, key, mapIndex, taskId }:
{
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>,
"queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey:
Common.UseTaskStateServiceGetTaskStateKeyFn({ dagId, dagRunId, key, mapIndex,
taskId }, queryKey), queryFn: () => TaskStateService.getTaskState({ dagId,
dagRunId, key, mapIndex, taskId }) as TData, ...options });
+/**
* Get Xcom Entry
* Get an XCom entry.
* @param data The data for the request.
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 99b1c493455..9fc0f4c7fb5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -381,6 +381,63 @@ export const $AssetResponse = {
description: 'Asset serializer for responses.'
} as const;
+export const $AssetStateBody = {
+ properties: {
+ value: {
+ type: 'string',
+ maxLength: 65535,
+ title: 'Value'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['value'],
+ title: 'AssetStateBody',
+ description: 'Request body for setting an asset state value.'
+} as const;
+
+export const $AssetStateCollectionResponse = {
+ properties: {
+ asset_states: {
+ items: {
+ '$ref': '#/components/schemas/AssetStateResponse'
+ },
+ type: 'array',
+ title: 'Asset States'
+ },
+ total_entries: {
+ type: 'integer',
+ title: 'Total Entries'
+ }
+ },
+ type: 'object',
+ required: ['asset_states', 'total_entries'],
+ title: 'AssetStateCollectionResponse',
+ description: 'All asset state entries for an asset.'
+} as const;
+
+export const $AssetStateResponse = {
+ properties: {
+ key: {
+ type: 'string',
+ title: 'Key'
+ },
+ value: {
+ type: 'string',
+ title: 'Value'
+ },
+ updated_at: {
+ type: 'string',
+ format: 'date-time',
+ title: 'Updated At'
+ }
+ },
+ type: 'object',
+ required: ['key', 'value', 'updated_at'],
+ title: 'AssetStateResponse',
+ description: 'A single asset state key/value pair with metadata.'
+} as const;
+
export const $AssetWatcherResponse = {
properties: {
name: {
@@ -6738,6 +6795,75 @@ export const $TaskResponse = {
description: 'Task serializer for responses.'
} as const;
+export const $TaskStateBody = {
+ properties: {
+ value: {
+ type: 'string',
+ maxLength: 65535,
+ title: 'Value'
+ }
+ },
+ additionalProperties: false,
+ type: 'object',
+ required: ['value'],
+ title: 'TaskStateBody',
+ description: 'Request body for setting a task state value.'
+} as const;
+
+export const $TaskStateCollectionResponse = {
+ properties: {
+ task_states: {
+ items: {
+ '$ref': '#/components/schemas/TaskStateResponse'
+ },
+ type: 'array',
+ title: 'Task States'
+ },
+ total_entries: {
+ type: 'integer',
+ title: 'Total Entries'
+ }
+ },
+ type: 'object',
+ required: ['task_states', 'total_entries'],
+ title: 'TaskStateCollectionResponse',
+ description: 'All task state entries for a task instance.'
+} as const;
+
+export const $TaskStateResponse = {
+ properties: {
+ key: {
+ type: 'string',
+ title: 'Key'
+ },
+ value: {
+ type: 'string',
+ title: 'Value'
+ },
+ updated_at: {
+ type: 'string',
+ format: 'date-time',
+ title: 'Updated At'
+ },
+ expires_at: {
+ anyOf: [
+ {
+ type: 'string',
+ format: 'date-time'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Expires At'
+ }
+ },
+ type: 'object',
+ required: ['key', 'value', 'updated_at', 'expires_at'],
+ title: 'TaskStateResponse',
+ description: 'A single task state key/value pair with metadata.'
+} as const;
+
export const $TimeDelta = {
properties: {
__type: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
index 1e24595089b..bc1f4471470 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts
@@ -3,7 +3,7 @@
import type { CancelablePromise } from './core/CancelablePromise';
import { OpenAPI } from './core/OpenAPI';
import { request as __request } from './core/request';
-import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
+import type { GetAssetsData, GetAssetsResponse, GetAssetAliasesData,
GetAssetAliasesResponse, GetAssetAliasData, GetAssetAliasResponse,
GetAssetEventsData, GetAssetEventsResponse, CreateAssetEventData,
CreateAssetEventResponse, MaterializeAssetData, MaterializeAssetResponse,
GetAssetQueuedEventsData, GetAssetQueuedEventsResponse,
DeleteAssetQueuedEventsData, DeleteAssetQueuedEventsResponse, GetAssetData,
GetAssetResponse, GetDagAssetQueuedEventsData, GetDagAssetQueuedEventsResponse,
Dele [...]
export class AssetService {
/**
@@ -3505,6 +3505,325 @@ export class ProviderService {
}
+export class AssetStateService {
+ /**
+ * List Asset States
+ * List all state entries for an asset.
+ * @param data The data for the request.
+ * @param data.assetId
+ * @param data.limit
+ * @param data.offset
+ * @returns AssetStateCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static listAssetStates(data: ListAssetStatesData):
CancelablePromise<ListAssetStatesResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url: '/api/v2/assets/{asset_id}/states',
+ path: {
+ asset_id: data.assetId
+ },
+ query: {
+ limit: data.limit,
+ offset: data.offset
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Clear Asset State
+ * Delete all state keys for an asset.
+ * @param data The data for the request.
+ * @param data.assetId
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static clearAssetState(data: ClearAssetStateData):
CancelablePromise<ClearAssetStateResponse> {
+ return __request(OpenAPI, {
+ method: 'DELETE',
+ url: '/api/v2/assets/{asset_id}/states',
+ path: {
+ asset_id: data.assetId
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Get Asset State
+ * Get a single asset state entry.
+ * @param data The data for the request.
+ * @param data.key
+ * @param data.assetId
+ * @returns AssetStateResponse Successful Response
+ * @throws ApiError
+ */
+ public static getAssetState(data: GetAssetStateData):
CancelablePromise<GetAssetStateResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url: '/api/v2/assets/{asset_id}/states/{key}',
+ path: {
+ key: data.key,
+ asset_id: data.assetId
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Set Asset State
+ * Set an asset state value. Creates or overwrites the key.
+ * @param data The data for the request.
+ * @param data.key
+ * @param data.assetId
+ * @param data.requestBody
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static setAssetState(data: SetAssetStateData):
CancelablePromise<SetAssetStateResponse> {
+ return __request(OpenAPI, {
+ method: 'PUT',
+ url: '/api/v2/assets/{asset_id}/states/{key}',
+ path: {
+ key: data.key,
+ asset_id: data.assetId
+ },
+ body: data.requestBody,
+ mediaType: 'application/json',
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Delete Asset State
+ * Delete a single asset state key. No-op if the key does not exist.
+ * @param data The data for the request.
+ * @param data.key
+ * @param data.assetId
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static deleteAssetState(data: DeleteAssetStateData):
CancelablePromise<DeleteAssetStateResponse> {
+ return __request(OpenAPI, {
+ method: 'DELETE',
+ url: '/api/v2/assets/{asset_id}/states/{key}',
+ path: {
+ key: data.key,
+ asset_id: data.assetId
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+}
+
+export class TaskStateService {
+ /**
+ * List Task States
+ * List all task state entries for a task instance.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.mapIndex
+ * @param data.limit
+ * @param data.offset
+ * @returns TaskStateCollectionResponse Successful Response
+ * @throws ApiError
+ */
+ public static listTaskStates(data: ListTaskStatesData):
CancelablePromise<ListTaskStatesResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url:
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states',
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId
+ },
+ query: {
+ map_index: data.mapIndex,
+ limit: data.limit,
+ offset: data.offset
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Clear Task State
+ * Delete all task state keys for a task instance.
+ *
+ * When ``all_map_indices=true``, state is cleared for every map index of
the task and
+ * the ``map_index`` parameter is ignored.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.mapIndex
+ * @param data.allMapIndices
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static clearTaskState(data: ClearTaskStateData):
CancelablePromise<ClearTaskStateResponse> {
+ return __request(OpenAPI, {
+ method: 'DELETE',
+ url:
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states',
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId
+ },
+ query: {
+ map_index: data.mapIndex,
+ all_map_indices: data.allMapIndices
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Get Task State
+ * Get a single task state entry.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.key
+ * @param data.mapIndex
+ * @returns TaskStateResponse Successful Response
+ * @throws ApiError
+ */
+ public static getTaskState(data: GetTaskStateData):
CancelablePromise<GetTaskStateResponse> {
+ return __request(OpenAPI, {
+ method: 'GET',
+ url:
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}',
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId,
+ key: data.key
+ },
+ query: {
+ map_index: data.mapIndex
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Set Task State
+ * Set a task state value. Creates or overwrites the key.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.key
+ * @param data.requestBody
+ * @param data.mapIndex
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static setTaskState(data: SetTaskStateData):
CancelablePromise<SetTaskStateResponse> {
+ return __request(OpenAPI, {
+ method: 'PUT',
+ url:
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}',
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId,
+ key: data.key
+ },
+ query: {
+ map_index: data.mapIndex
+ },
+ body: data.requestBody,
+ mediaType: 'application/json',
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+ /**
+ * Delete Task State
+ * Delete a single task state key. No-op if the key does not exist.
+ * @param data The data for the request.
+ * @param data.dagId
+ * @param data.dagRunId
+ * @param data.taskId
+ * @param data.key
+ * @param data.mapIndex
+ * @returns void Successful Response
+ * @throws ApiError
+ */
+ public static deleteTaskState(data: DeleteTaskStateData):
CancelablePromise<DeleteTaskStateResponse> {
+ return __request(OpenAPI, {
+ method: 'DELETE',
+ url:
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}',
+ path: {
+ dag_id: data.dagId,
+ dag_run_id: data.dagRunId,
+ task_id: data.taskId,
+ key: data.key
+ },
+ query: {
+ map_index: data.mapIndex
+ },
+ errors: {
+ 401: 'Unauthorized',
+ 403: 'Forbidden',
+ 404: 'Not Found',
+ 422: 'Validation Error'
+ }
+ });
+ }
+
+}
+
export class XcomService {
/**
* Get Xcom Entry
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 5db38b8e961..ee025fc11a5 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -96,6 +96,30 @@ export type AssetResponse = {
last_asset_event?: LastAssetEventResponse | null;
};
+/**
+ * Request body for setting an asset state value.
+ */
+export type AssetStateBody = {
+ value: string;
+};
+
+/**
+ * All asset state entries for an asset.
+ */
+export type AssetStateCollectionResponse = {
+ asset_states: Array<AssetStateResponse>;
+ total_entries: number;
+};
+
+/**
+ * A single asset state key/value pair with metadata.
+ */
+export type AssetStateResponse = {
+ key: string;
+ value: string;
+ updated_at: string;
+};
+
/**
* Asset watcher serializer for responses.
*/
@@ -1631,6 +1655,31 @@ export type TaskResponse = {
readonly extra_links: Array<(string)>;
};
+/**
+ * Request body for setting a task state value.
+ */
+export type TaskStateBody = {
+ value: string;
+};
+
+/**
+ * All task state entries for a task instance.
+ */
+export type TaskStateCollectionResponse = {
+ task_states: Array<TaskStateResponse>;
+ total_entries: number;
+};
+
+/**
+ * A single task state key/value pair with metadata.
+ */
+export type TaskStateResponse = {
+ key: string;
+ value: string;
+ updated_at: string;
+ expires_at: string | null;
+};
+
/**
* TimeDelta can be used to interact with datetime.timedelta objects.
*/
@@ -3775,6 +3824,94 @@ export type GetProvidersData = {
export type GetProvidersResponse = ProviderCollectionResponse;
+export type ListAssetStatesData = {
+ assetId: number;
+ limit?: number;
+ offset?: number;
+};
+
+export type ListAssetStatesResponse = AssetStateCollectionResponse;
+
+export type ClearAssetStateData = {
+ assetId: number;
+};
+
+export type ClearAssetStateResponse = void;
+
+export type GetAssetStateData = {
+ assetId: number;
+ key: string;
+};
+
+export type GetAssetStateResponse = AssetStateResponse;
+
+export type SetAssetStateData = {
+ assetId: number;
+ key: string;
+ requestBody: AssetStateBody;
+};
+
+export type SetAssetStateResponse = void;
+
+export type DeleteAssetStateData = {
+ assetId: number;
+ key: string;
+};
+
+export type DeleteAssetStateResponse = void;
+
+export type ListTaskStatesData = {
+ dagId: string;
+ dagRunId: string;
+ limit?: number;
+ mapIndex?: number;
+ offset?: number;
+ taskId: string;
+};
+
+export type ListTaskStatesResponse = TaskStateCollectionResponse;
+
+export type ClearTaskStateData = {
+ allMapIndices?: boolean;
+ dagId: string;
+ dagRunId: string;
+ mapIndex?: number;
+ taskId: string;
+};
+
+export type ClearTaskStateResponse = void;
+
+export type GetTaskStateData = {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+};
+
+export type GetTaskStateResponse = TaskStateResponse;
+
+export type SetTaskStateData = {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ requestBody: TaskStateBody;
+ taskId: string;
+};
+
+export type SetTaskStateResponse = void;
+
+export type DeleteTaskStateData = {
+ dagId: string;
+ dagRunId: string;
+ key: string;
+ mapIndex?: number;
+ taskId: string;
+};
+
+export type DeleteTaskStateResponse = void;
+
export type GetXcomEntryData = {
dagId: string;
dagRunId: string;
@@ -6810,6 +6947,264 @@ export type $OpenApiTs = {
};
};
};
+ '/api/v2/assets/{asset_id}/states': {
+ get: {
+ req: ListAssetStatesData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: AssetStateCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ delete: {
+ req: ClearAssetStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 204: void;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+ '/api/v2/assets/{asset_id}/states/{key}': {
+ get: {
+ req: GetAssetStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: AssetStateResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ put: {
+ req: SetAssetStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 204: void;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ delete: {
+ req: DeleteAssetStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 204: void;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states': {
+ get: {
+ req: ListTaskStatesData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: TaskStateCollectionResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ delete: {
+ req: ClearTaskStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 204: void;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
+
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/states/{key}':
{
+ get: {
+ req: GetTaskStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 200: TaskStateResponse;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ put: {
+ req: SetTaskStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 204: void;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ delete: {
+ req: DeleteTaskStateData;
+ res: {
+ /**
+ * Successful Response
+ */
+ 204: void;
+ /**
+ * Unauthorized
+ */
+ 401: HTTPExceptionResponse;
+ /**
+ * Forbidden
+ */
+ 403: HTTPExceptionResponse;
+ /**
+ * Not Found
+ */
+ 404: HTTPExceptionResponse;
+ /**
+ * Validation Error
+ */
+ 422: HTTPValidationError;
+ };
+ };
+ };
'/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}':
{
get: {
req: GetXcomEntryData;
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
new file mode 100644
index 00000000000..c53fbb99ee9
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
@@ -0,0 +1,255 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+
+from airflow.models.asset import AssetModel
+from airflow.models.asset_state import AssetStateModel
+
+from tests_common.test_utils.db import clear_db_assets
+
+pytestmark = pytest.mark.db_test
+
+ASSET_URI = "s3://bucket/watermarks"
+ASSET_NAME = "test_asset"
+
+
+def _create_asset(session) -> AssetModel:
+ asset = AssetModel(uri=ASSET_URI, name=ASSET_NAME, group="test")
+ session.add(asset)
+ session.flush()
+ return asset
+
+
+def _create_asset_state(session, asset_id: int, key: str, value: str) -> None:
+ row = AssetStateModel(asset_id=asset_id, key=key, value=value)
+ session.add(row)
+ session.flush()
+
+
+class TestAssetStateEndpoint:
+ @staticmethod
+ def clear_db():
+ clear_db_assets()
+
+ @pytest.fixture(autouse=True)
+ def setup(self, session):
+ self.clear_db()
+ self.asset = _create_asset(session)
+ session.commit()
+ self._session = session
+ self._base_url = f"/assets/{self.asset.id}/states"
+
+ def teardown_method(self):
+ self.clear_db()
+
+
+class TestAssetNotFound(TestAssetStateEndpoint):
+ """All endpoints return 404 when asset_id does not exist."""
+
+ def test_list_unknown_asset_returns_404(self, test_client):
+ assert test_client.get("/assets/99999/states").status_code == 404
+
+ def test_get_unknown_asset_returns_404(self, test_client):
+ assert test_client.get("/assets/99999/states/key").status_code == 404
+
+ def test_set_unknown_asset_returns_404(self, test_client):
+ assert test_client.put("/assets/99999/states/key", json={"value":
"v"}).status_code == 404
+
+ def test_delete_unknown_asset_returns_404(self, test_client):
+ assert test_client.delete("/assets/99999/states/key").status_code ==
404
+
+ def test_clear_unknown_asset_returns_404(self, test_client):
+ assert test_client.delete("/assets/99999/states").status_code == 404
+
+
+class TestListAssetState(TestAssetStateEndpoint):
+ def test_returns_empty_list_when_no_state(self, test_client):
+ response = test_client.get(self._base_url)
+ assert response.status_code == 200
+ assert response.json() == {"asset_states": [], "total_entries": 0}
+
+ def test_returns_all_keys(self, test_client):
+ _create_asset_state(self._session, self.asset.id, "watermark",
"2026-05-01")
+ _create_asset_state(self._session, self.asset.id, "file_count", "42")
+ self._session.commit()
+
+ response = test_client.get(self._base_url)
+ assert response.status_code == 200
+ data = response.json()
+ assert data["total_entries"] == 2
+ keys = {item["key"]: item["value"] for item in data["asset_states"]}
+ assert keys == {"watermark": "2026-05-01", "file_count": "42"}
+
+ def test_returns_metadata_fields(self, test_client):
+ _create_asset_state(self._session, self.asset.id, "watermark",
"2026-05-01")
+ self._session.commit()
+
+ item = test_client.get(self._base_url).json()["asset_states"][0]
+ assert "updated_at" in item
+ assert item["key"] == "watermark"
+
+ def test_pagination_limit(self, test_client):
+ for k in ("watermark", "file_count", "last_run"):
+ _create_asset_state(self._session, self.asset.id, k, "v")
+ self._session.commit()
+
+ response = test_client.get(f"{self._base_url}?limit=2")
+ data = response.json()
+ assert data["total_entries"] == 3
+ assert len(data["asset_states"]) == 2
+
+ def test_pagination_offset(self, test_client):
+ for k in ("watermark", "file_count", "last_run"):
+ _create_asset_state(self._session, self.asset.id, k, "v")
+ self._session.commit()
+
+ response = test_client.get(f"{self._base_url}?limit=2&offset=2")
+ data = response.json()
+ assert data["total_entries"] == 3
+ assert len(data["asset_states"]) == 1
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert unauthenticated_test_client.get(self._base_url).status_code ==
401
+
+
+class TestGetAssetState(TestAssetStateEndpoint):
+ def test_returns_value(self, test_client):
+ _create_asset_state(self._session, self.asset.id, "watermark",
"2026-05-01")
+ self._session.commit()
+
+ response = test_client.get(f"{self._base_url}/watermark")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["key"] == "watermark"
+ assert data["value"] == "2026-05-01"
+ assert "updated_at" in data
+
+ def test_missing_key_returns_404(self, test_client):
+ assert test_client.get(f"{self._base_url}/nonexistent").status_code ==
404
+
+ def test_key_with_slash_is_supported(self, test_client):
+ """Keys containing slashes must work — route uses {key:path}."""
+ _create_asset_state(self._session, self.asset.id, "partition/date",
"2026-05-01")
+ self._session.commit()
+
+ response = test_client.get(f"{self._base_url}/partition/date")
+ assert response.status_code == 200
+ assert response.json()["key"] == "partition/date"
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert
unauthenticated_test_client.get(f"{self._base_url}/watermark").status_code ==
401
+
+
+class TestSetAssetState(TestAssetStateEndpoint):
+ def test_creates_new_key(self, test_client):
+ response = test_client.put(f"{self._base_url}/watermark",
json={"value": "2026-05-01"})
+ assert response.status_code == 204
+
+ assert test_client.get(f"{self._base_url}/watermark").json()["value"]
== "2026-05-01"
+
+ def test_overwrites_existing_key(self, test_client):
+ test_client.put(f"{self._base_url}/watermark", json={"value": "v1"})
+ test_client.put(f"{self._base_url}/watermark", json={"value": "v2"})
+
+ assert test_client.get(f"{self._base_url}/watermark").json()["value"]
== "v2"
+
+ def test_empty_body_returns_422(self, test_client):
+ assert test_client.put(f"{self._base_url}/watermark",
json={}).status_code == 422
+
+ def test_oversized_value_returns_422(self, test_client):
+ assert test_client.put(f"{self._base_url}/watermark", json={"value":
"x" * 65536}).status_code == 422
+
+ def test_key_with_slash_is_supported(self, test_client):
+ response = test_client.put(f"{self._base_url}/partition/date",
json={"value": "2026-05-01"})
+ assert response.status_code == 204
+ assert
test_client.get(f"{self._base_url}/partition/date").json()["key"] ==
"partition/date"
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert (
+ unauthenticated_test_client.put(f"{self._base_url}/watermark",
json={"value": "v"}).status_code
+ == 401
+ )
+
+
+class TestDeleteAssetState(TestAssetStateEndpoint):
+ def test_deletes_key(self, test_client):
+ _create_asset_state(self._session, self.asset.id, "watermark",
"2026-05-01")
+ self._session.commit()
+
+ assert test_client.delete(f"{self._base_url}/watermark").status_code
== 204
+ assert test_client.get(f"{self._base_url}/watermark").status_code ==
404
+
+ def test_delete_noop_for_missing_key(self, test_client):
+ assert test_client.delete(f"{self._base_url}/nonexistent").status_code
== 204
+
+ def test_only_deletes_target_key(self, test_client):
+ _create_asset_state(self._session, self.asset.id, "watermark", "a")
+ _create_asset_state(self._session, self.asset.id, "file_count", "b")
+ self._session.commit()
+
+ test_client.delete(f"{self._base_url}/watermark")
+
+ assert test_client.get(f"{self._base_url}/watermark").status_code ==
404
+ assert test_client.get(f"{self._base_url}/file_count").json()["value"]
== "b"
+
+ def test_key_with_slash_is_supported(self, test_client):
+ _create_asset_state(self._session, self.asset.id, "partition/date",
"v")
+ self._session.commit()
+
+ assert
test_client.delete(f"{self._base_url}/partition/date").status_code == 204
+ assert test_client.get(f"{self._base_url}/partition/date").status_code
== 404
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert
unauthenticated_test_client.delete(f"{self._base_url}/watermark").status_code
== 401
+
+
+class TestClearAssetState(TestAssetStateEndpoint):
+ def test_clears_all_keys(self, test_client):
+ for k, v in [("watermark", "a"), ("file_count", "b"), ("last_run",
"c")]:
+ _create_asset_state(self._session, self.asset.id, k, v)
+ self._session.commit()
+
+ assert test_client.delete(self._base_url).status_code == 204
+ assert test_client.get(self._base_url).json()["total_entries"] == 0
+
+ def test_clear_is_noop_when_no_state(self, test_client):
+ assert test_client.delete(self._base_url).status_code == 204
+
+ def test_clear_does_not_affect_other_assets(self, test_client):
+ other_asset = AssetModel(uri="s3://other/asset", name="other_asset",
group="test")
+ self._session.add(other_asset)
+ self._session.flush()
+ _create_asset_state(self._session, self.asset.id, "watermark", "mine")
+ _create_asset_state(self._session, other_asset.id, "watermark",
"theirs")
+ self._session.commit()
+
+ test_client.delete(self._base_url)
+
+ other_url = f"/assets/{other_asset.id}/states"
+ assert test_client.get(f"{other_url}/watermark").json()["value"] ==
"theirs"
+
+ def test_clears_slash_keyed_entries(self, test_client):
+ _create_asset_state(self._session, self.asset.id, "partition/date",
"v")
+ self._session.commit()
+
+ assert test_client.delete(self._base_url).status_code == 204
+ assert test_client.get(self._base_url).json()["total_entries"] == 0
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert unauthenticated_test_client.delete(self._base_url).status_code
== 401
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
new file mode 100644
index 00000000000..c53212fa5e4
--- /dev/null
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
@@ -0,0 +1,293 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import pytest
+from sqlalchemy import select
+
+from airflow._shared.timezones import timezone
+from airflow.models.dagrun import DagRun
+from airflow.models.task_state import TaskStateModel
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.utils.types import DagRunType
+
+from tests_common.test_utils.db import clear_db_dag_bundles, clear_db_dags,
clear_db_runs
+
+pytestmark = pytest.mark.db_test
+
+DAG_ID = "test_dag"
+TASK_ID = "test_task"
+LOGICAL_DATE = timezone.datetime(2026, 1, 1)
+RUN_ID = DagRun.generate_run_id(run_type=DagRunType.MANUAL,
logical_date=LOGICAL_DATE, run_after=LOGICAL_DATE)
+
+BASE_URL = f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/{TASK_ID}/states"
+
+
+def _create_dag_run(dag_maker, session):
+ with dag_maker(DAG_ID, schedule=None, start_date=LOGICAL_DATE):
+ EmptyOperator(task_id=TASK_ID)
+ dag_maker.create_dagrun(run_id=RUN_ID, run_type=DagRunType.MANUAL,
logical_date=LOGICAL_DATE)
+ dag_maker.sync_dagbag_to_db()
+ session.merge(dag_maker.dag_model)
+ session.commit()
+
+
+def _create_task_state(session, key: str, value: str, dag_run: DagRun) -> None:
+ row = TaskStateModel(
+ dag_run_id=dag_run.id,
+ dag_id=DAG_ID,
+ run_id=RUN_ID,
+ task_id=TASK_ID,
+ map_index=-1,
+ key=key,
+ value=value,
+ )
+ session.add(row)
+ session.flush()
+
+
+class TestTaskStateEndpoint:
+ @staticmethod
+ def clear_db():
+ clear_db_dags()
+ clear_db_runs()
+ clear_db_dag_bundles()
+
+ @pytest.fixture(autouse=True)
+ def setup(self, dag_maker, session):
+ self.clear_db()
+ _create_dag_run(dag_maker, session)
+ self.dag_run = session.scalar(select(DagRun).where(DagRun.run_id ==
RUN_ID))
+ self._session = session
+
+ def teardown_method(self):
+ self.clear_db()
+
+
+class TestListTaskState(TestTaskStateEndpoint):
+ def test_returns_empty_list_when_no_state(self, test_client):
+ response = test_client.get(BASE_URL)
+ assert response.status_code == 200
+ assert response.json() == {"task_states": [], "total_entries": 0}
+
+ def test_returns_all_keys(self, test_client):
+ _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+ _create_task_state(self._session, "checkpoint", "step_3", self.dag_run)
+ self._session.commit()
+
+ response = test_client.get(BASE_URL)
+ assert response.status_code == 200
+ data = response.json()
+ assert data["total_entries"] == 2
+ keys = {item["key"]: item["value"] for item in data["task_states"]}
+ assert keys == {"job_id": "spark_001", "checkpoint": "step_3"}
+
+ def test_returns_state_metadata_fields(self, test_client):
+ _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+ self._session.commit()
+
+ response = test_client.get(BASE_URL)
+ item = response.json()["task_states"][0]
+ assert "updated_at" in item
+ assert "expires_at" in item
+
+ def test_map_index_isolation(self, test_client):
+ """map_index=-1 (default) doesn't return rows for other map indices."""
+ row = TaskStateModel(
+ dag_run_id=self.dag_run.id,
+ dag_id=DAG_ID,
+ run_id=RUN_ID,
+ task_id=TASK_ID,
+ map_index=0,
+ key="job_id",
+ value="mapped_app",
+ )
+ self._session.add(row)
+ self._session.commit()
+
+ response = test_client.get(BASE_URL)
+ assert response.json()["total_entries"] == 0
+
+ def test_pagination_limit(self, test_client):
+ for k in ("a", "b", "c"):
+ _create_task_state(self._session, k, "v", self.dag_run)
+ self._session.commit()
+
+ response = test_client.get(f"{BASE_URL}?limit=2")
+ data = response.json()
+ assert data["total_entries"] == 3
+ assert len(data["task_states"]) == 2
+
+ def test_pagination_offset(self, test_client):
+ for k in ("a", "b", "c"):
+ _create_task_state(self._session, k, "v", self.dag_run)
+ self._session.commit()
+
+ response = test_client.get(f"{BASE_URL}?limit=2&offset=2")
+ data = response.json()
+ assert data["total_entries"] == 3
+ assert len(data["task_states"]) == 1
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert unauthenticated_test_client.get(BASE_URL).status_code == 401
+
+
+class TestGetTaskState(TestTaskStateEndpoint):
+ def test_returns_value(self, test_client):
+ _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+ self._session.commit()
+
+ response = test_client.get(f"{BASE_URL}/job_id")
+ assert response.status_code == 200
+ data = response.json()
+ assert data["key"] == "job_id"
+ assert data["value"] == "spark_001"
+
+ def test_missing_key_returns_404(self, test_client):
+ response = test_client.get(f"{BASE_URL}/nonexistent")
+ assert response.status_code == 404
+
+ def test_key_with_slash_is_supported(self, test_client):
+ """Keys containing slashes must work — route uses {key:path}."""
+ _create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
+ self._session.commit()
+
+ response = test_client.get(f"{BASE_URL}/workflow/step_1")
+ assert response.status_code == 200
+ assert response.json()["key"] == "workflow/step_1"
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert
unauthenticated_test_client.get(f"{BASE_URL}/job_id").status_code == 401
+
+
+class TestSetTaskState(TestTaskStateEndpoint):
+ def test_creates_new_key(self, test_client):
+ response = test_client.put(f"{BASE_URL}/job_id", json={"value":
"spark_001"})
+ assert response.status_code == 204
+
+ get_resp = test_client.get(f"{BASE_URL}/job_id")
+ assert get_resp.json()["value"] == "spark_001"
+
+ def test_overwrites_existing_key(self, test_client):
+ test_client.put(f"{BASE_URL}/job_id", json={"value": "v1"})
+ test_client.put(f"{BASE_URL}/job_id", json={"value": "v2"})
+
+ assert test_client.get(f"{BASE_URL}/job_id").json()["value"] == "v2"
+
+ def test_empty_body_returns_422(self, test_client):
+ assert test_client.put(f"{BASE_URL}/job_id", json={}).status_code ==
422
+
+ def test_oversized_value_returns_422(self, test_client):
+ assert test_client.put(f"{BASE_URL}/job_id", json={"value": "x" *
65536}).status_code == 422
+
+ def test_set_nonexistent_dag_run_returns_404(self, test_client):
+ """set() raises ValueError when DagRun doesn't exist — should surface
as 404."""
+ bad_url =
f"/dags/{DAG_ID}/dagRuns/nonexistent_run/taskInstances/{TASK_ID}/states/job_id"
+ response = test_client.put(bad_url, json={"value": "v"})
+ assert response.status_code == 404
+
+ def test_set_nonexistent_task_id_returns_404(self, test_client):
+ """set() returns 404 when task_id doesn not match any TaskInstance in
the run."""
+ bad_url =
f"/dags/{DAG_ID}/dagRuns/{RUN_ID}/taskInstances/nonexistent_task/states/job_id"
+ response = test_client.put(bad_url, json={"value": "v"})
+ assert response.status_code == 404
+
+ def test_key_with_slash_is_supported(self, test_client):
+ response = test_client.put(f"{BASE_URL}/workflow/step_1",
json={"value": "v"})
+ assert response.status_code == 204
+ assert test_client.get(f"{BASE_URL}/workflow/step_1").json()["key"] ==
"workflow/step_1"
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert unauthenticated_test_client.put(f"{BASE_URL}/job_id",
json={"value": "v"}).status_code == 401
+
+
+class TestDeleteTaskState(TestTaskStateEndpoint):
+ def test_deletes_key(self, test_client):
+ _create_task_state(self._session, "job_id", "spark_001", self.dag_run)
+ self._session.commit()
+
+ assert test_client.delete(f"{BASE_URL}/job_id").status_code == 204
+ assert test_client.get(f"{BASE_URL}/job_id").status_code == 404
+
+ def test_delete_noop_for_missing_key(self, test_client):
+ assert test_client.delete(f"{BASE_URL}/nonexistent").status_code == 204
+
+ def test_only_deletes_target_key(self, test_client):
+ _create_task_state(self._session, "job_id", "a", self.dag_run)
+ _create_task_state(self._session, "checkpoint", "b", self.dag_run)
+ self._session.commit()
+
+ test_client.delete(f"{BASE_URL}/job_id")
+
+ assert test_client.get(f"{BASE_URL}/job_id").status_code == 404
+ assert test_client.get(f"{BASE_URL}/checkpoint").json()["value"] == "b"
+
+ def test_key_with_slash_is_supported(self, test_client):
+ _create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
+ self._session.commit()
+
+ assert test_client.delete(f"{BASE_URL}/workflow/step_1").status_code
== 204
+ assert test_client.get(f"{BASE_URL}/workflow/step_1").status_code ==
404
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert
unauthenticated_test_client.delete(f"{BASE_URL}/job_id").status_code == 401
+
+
+class TestClearTaskState(TestTaskStateEndpoint):
+ def test_clears_all_keys(self, test_client):
+ for k, v in [("job_id", "a"), ("checkpoint", "b"), ("retry_count",
"c")]:
+ _create_task_state(self._session, k, v, self.dag_run)
+ self._session.commit()
+
+ assert test_client.delete(BASE_URL).status_code == 204
+ assert test_client.get(BASE_URL).json()["total_entries"] == 0
+
+ def test_all_map_indices_clears_across_mapped_instances(self, test_client):
+ """all_map_indices=true wipes state for every map index of the task."""
+ for map_index in (-1, 0, 1):
+ row = TaskStateModel(
+ dag_run_id=self.dag_run.id,
+ dag_id=DAG_ID,
+ run_id=RUN_ID,
+ task_id=TASK_ID,
+ map_index=map_index,
+ key="job_id",
+ value=f"app_{map_index}",
+ )
+ self._session.add(row)
+ self._session.commit()
+
+ # Default clear only wipes map_index=-1
+ assert test_client.delete(BASE_URL).status_code == 204
+ # map_index=0 and map_index=1 rows still exist
+ assert
test_client.get(f"{BASE_URL}?map_index=0").json()["total_entries"] == 1
+ assert
test_client.get(f"{BASE_URL}?map_index=1").json()["total_entries"] == 1
+
+ # all_map_indices=true wipes everything
+ assert
test_client.delete(f"{BASE_URL}?all_map_indices=true").status_code == 204
+ assert
test_client.get(f"{BASE_URL}?map_index=0").json()["total_entries"] == 0
+ assert
test_client.get(f"{BASE_URL}?map_index=1").json()["total_entries"] == 0
+
+ def test_key_with_slash_is_supported(self, test_client):
+ _create_task_state(self._session, "workflow/step_1", "v", self.dag_run)
+ self._session.commit()
+
+ assert test_client.delete(BASE_URL).status_code == 204
+ assert test_client.get(BASE_URL).json()["total_entries"] == 0
+
+ def test_unauthorized_returns_401(self, unauthenticated_test_client):
+ assert unauthenticated_test_client.delete(BASE_URL).status_code == 401
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 276c8699de0..6886bdd3aa7 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -49,6 +49,27 @@ class AssetAliasResponse(BaseModel):
group: Annotated[str, Field(title="Group")]
+class AssetStateBody(BaseModel):
+ """
+ Request body for setting an asset state value.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ value: Annotated[str, Field(max_length=65535, title="Value")]
+
+
+class AssetStateResponse(BaseModel):
+ """
+ A single asset state key/value pair with metadata.
+ """
+
+ key: Annotated[str, Field(title="Key")]
+ value: Annotated[str, Field(title="Value")]
+ updated_at: Annotated[datetime, Field(title="Updated At")]
+
+
class AssetWatcherResponse(BaseModel):
"""
Asset watcher serializer for responses.
@@ -906,6 +927,28 @@ class TaskOutletAssetReference(BaseModel):
updated_at: Annotated[datetime, Field(title="Updated At")]
+class TaskStateBody(BaseModel):
+ """
+ Request body for setting a task state value.
+ """
+
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ value: Annotated[str, Field(max_length=65535, title="Value")]
+
+
+class TaskStateResponse(BaseModel):
+ """
+ A single task state key/value pair with metadata.
+ """
+
+ key: Annotated[str, Field(title="Key")]
+ value: Annotated[str, Field(title="Value")]
+ updated_at: Annotated[datetime, Field(title="Updated At")]
+ expires_at: Annotated[datetime | None, Field(title="Expires At")] = None
+
+
class TimeDelta(BaseModel):
"""
TimeDelta can be used to interact with datetime.timedelta objects.
@@ -1136,6 +1179,15 @@ class AssetResponse(BaseModel):
last_asset_event: LastAssetEventResponse | None = None
+class AssetStateCollectionResponse(BaseModel):
+ """
+ All asset state entries for an asset.
+ """
+
+ asset_states: Annotated[list[AssetStateResponse], Field(title="Asset
States")]
+ total_entries: Annotated[int, Field(title="Total Entries")]
+
+
class BackfillPostBody(BaseModel):
"""
Object used for create backfill request.
@@ -1854,6 +1906,15 @@ class TaskResponse(BaseModel):
]
+class TaskStateCollectionResponse(BaseModel):
+ """
+ All task state entries for a task instance.
+ """
+
+ task_states: Annotated[list[TaskStateResponse], Field(title="Task States")]
+ total_entries: Annotated[int, Field(title="Total Entries")]
+
+
class VariableCollectionResponse(BaseModel):
"""
Variable Collection serializer for responses.