This is an automated email from the ASF dual-hosted git repository.

amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c1299bd3437 Unify task/asset state storage between Core API and 
Execution API (#67547)
c1299bd3437 is described below

commit c1299bd3437d356dac0821a6ea2dac4d1e3f8e5e
Author: Amogh Desai <[email protected]>
AuthorDate: Fri May 29 12:11:25 2026 +0530

    Unify task/asset state storage between Core API and Execution API (#67547)
---
 .../api_fastapi/core_api/datamodels/asset_state.py | 22 +++++++--
 .../api_fastapi/core_api/datamodels/task_state.py  | 22 +++++++--
 .../core_api/openapi/v2-rest-api-generated.yaml    | 14 ++----
 .../core_api/routes/public/asset_state.py          |  9 ++--
 .../core_api/routes/public/task_state.py           |  9 ++--
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 14 ++----
 .../airflow/ui/openapi-gen/requests/types.gen.ts   |  8 ++--
 .../core_api/routes/public/test_asset_state.py     | 48 ++++++++++++++++++-
 .../core_api/routes/public/test_task_state.py      | 53 +++++++++++++++++++--
 .../src/airflowctl/api/datamodels/generated.py     | 54 +++++++++++-----------
 10 files changed, 186 insertions(+), 67 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
index 379d7802909..a373fe89d2d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
@@ -16,18 +16,21 @@
 # under the License.
 from __future__ import annotations
 
+import json
 from datetime import datetime
 
-from pydantic import Field
+from pydantic import JsonValue, field_validator
 
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 
+_MAX_SERIALIZED_BYTES = 65535
+
 
 class AssetStateResponse(BaseModel):
     """A single asset state key/value pair with metadata."""
 
     key: str
-    value: str
+    value: JsonValue
     updated_at: datetime
 
 
@@ -41,4 +44,17 @@ class AssetStateCollectionResponse(BaseModel):
 class AssetStateBody(StrictBaseModel):
     """Request body for setting an asset state value."""
 
-    value: str = Field(max_length=65535)
+    value: JsonValue
+
+    @field_validator("value")
+    @classmethod
+    def value_is_json_representable(cls, v: JsonValue) -> JsonValue:
+        if v is None:
+            raise ValueError("value cannot be null")
+        try:
+            serialized = json.dumps(v, allow_nan=False)
+        except ValueError:
+            raise ValueError("value contains non-finite numbers; NaN and Inf 
are not JSON representable")
+        if len(serialized) > _MAX_SERIALIZED_BYTES:
+            raise ValueError(f"value exceeds maximum serialized size of 
{_MAX_SERIALIZED_BYTES} bytes")
+        return v
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
index 856de74a087..e6622f842e1 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
@@ -16,18 +16,21 @@
 # under the License.
 from __future__ import annotations
 
+import json
 from datetime import datetime
 
-from pydantic import Field
+from pydantic import JsonValue, field_validator
 
 from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
 
+_MAX_SERIALIZED_BYTES = 65535
+
 
 class TaskStateResponse(BaseModel):
     """A single task state key/value pair with metadata."""
 
     key: str
-    value: str
+    value: JsonValue
     updated_at: datetime
     expires_at: datetime | None
 
@@ -42,4 +45,17 @@ class TaskStateCollectionResponse(BaseModel):
 class TaskStateBody(StrictBaseModel):
     """Request body for setting a task state value."""
 
-    value: str = Field(max_length=65535)
+    value: JsonValue
+
+    @field_validator("value")
+    @classmethod
+    def value_is_json_representable(cls, v: JsonValue) -> JsonValue:
+        if v is None:
+            raise ValueError("value cannot be null")
+        try:
+            serialized = json.dumps(v, allow_nan=False)
+        except ValueError:
+            raise ValueError("value contains non-finite numbers; NaN and Inf 
are not JSON representable")
+        if len(serialized) > _MAX_SERIALIZED_BYTES:
+            raise ValueError(f"value exceeds maximum serialized size of 
{_MAX_SERIALIZED_BYTES} bytes")
+        return v
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 618356a1ce7..df21d7570c5 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11360,9 +11360,7 @@ components:
     AssetStateBody:
       properties:
         value:
-          type: string
-          maxLength: 65535
-          title: Value
+          $ref: '#/components/schemas/JsonValue'
       additionalProperties: false
       type: object
       required:
@@ -11391,8 +11389,7 @@ components:
           type: string
           title: Key
         value:
-          type: string
-          title: Value
+          $ref: '#/components/schemas/JsonValue'
         updated_at:
           type: string
           format: date-time
@@ -15810,9 +15807,7 @@ components:
     TaskStateBody:
       properties:
         value:
-          type: string
-          maxLength: 65535
-          title: Value
+          $ref: '#/components/schemas/JsonValue'
       additionalProperties: false
       type: object
       required:
@@ -15841,8 +15836,7 @@ components:
           type: string
           title: Key
         value:
-          type: string
-          title: Value
+          $ref: '#/components/schemas/JsonValue'
         updated_at:
           type: string
           format: date-time
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
index 43580e0c762..877bd5f8f44 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import json
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, status
@@ -85,7 +86,9 @@ def list_asset_states(
         session=session,
     )
     rows = session.execute(paginated).all()
-    entries = [AssetStateResponse(key=r.key, value=r.value, 
updated_at=r.updated_at) for r in rows]
+    entries = [
+        AssetStateResponse(key=r.key, value=json.loads(r.value), 
updated_at=r.updated_at) for r in rows
+    ]
     return AssetStateCollectionResponse(asset_states=entries, 
total_entries=total_entries)
 
 
@@ -115,7 +118,7 @@ def get_asset_state(
             status_code=status.HTTP_404_NOT_FOUND,
             detail=f"Asset state key {key!r} not found",
         )
-    return AssetStateResponse(key=row.key, value=row.value, 
updated_at=row.updated_at)
+    return AssetStateResponse(key=row.key, value=json.loads(row.value), 
updated_at=row.updated_at)
 
 
 @asset_state_router.put(
@@ -131,7 +134,7 @@ def set_asset_state(
     session: SessionDep,
 ) -> None:
     """Set an asset state value. Creates or overwrites the key."""
-    get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value, 
session=session)
+    get_state_backend().set(AssetScope(asset_id=asset_id), key, 
json.dumps(body.value), session=session)
 
 
 @asset_state_router.delete(
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
index 138380232a8..31cc7272ddc 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
@@ -16,6 +16,7 @@
 # under the License.
 from __future__ import annotations
 
+import json
 from typing import Annotated
 
 from fastapi import Depends, HTTPException, Query, status
@@ -87,7 +88,9 @@ def list_task_states(
     )
     rows = session.execute(paginated).all()
     entries = [
-        TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at, 
expires_at=r.expires_at)
+        TaskStateResponse(
+            key=r.key, value=json.loads(r.value), updated_at=r.updated_at, 
expires_at=r.expires_at
+        )
         for r in rows
     ]
     return TaskStateCollectionResponse(task_states=entries, 
total_entries=total_entries)
@@ -127,7 +130,7 @@ def get_task_state(
             detail=f"Task state key {key!r} not found",
         )
     return TaskStateResponse(
-        key=row.key, value=row.value, updated_at=row.updated_at, 
expires_at=row.expires_at
+        key=row.key, value=json.loads(row.value), updated_at=row.updated_at, 
expires_at=row.expires_at
     )
 
 
@@ -162,7 +165,7 @@ def set_task_state(
         )
     scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
     try:
-        get_state_backend().set(scope, key, body.value, session=session)
+        get_state_backend().set(scope, key, json.dumps(body.value), 
session=session)
     except ValueError as e:
         raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, 
detail=str(e)) from e
 
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index f14d1a07f3d..e03a9e6bada 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -384,9 +384,7 @@ export const $AssetResponse = {
 export const $AssetStateBody = {
     properties: {
         value: {
-            type: 'string',
-            maxLength: 65535,
-            title: 'Value'
+            '$ref': '#/components/schemas/JsonValue'
         }
     },
     additionalProperties: false,
@@ -423,8 +421,7 @@ export const $AssetStateResponse = {
             title: 'Key'
         },
         value: {
-            type: 'string',
-            title: 'Value'
+            '$ref': '#/components/schemas/JsonValue'
         },
         updated_at: {
             type: 'string',
@@ -6982,9 +6979,7 @@ export const $TaskResponse = {
 export const $TaskStateBody = {
     properties: {
         value: {
-            type: 'string',
-            maxLength: 65535,
-            title: 'Value'
+            '$ref': '#/components/schemas/JsonValue'
         }
     },
     additionalProperties: false,
@@ -7021,8 +7016,7 @@ export const $TaskStateResponse = {
             title: 'Key'
         },
         value: {
-            type: 'string',
-            title: 'Value'
+            '$ref': '#/components/schemas/JsonValue'
         },
         updated_at: {
             type: 'string',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 7c2d5c0ca99..85a37588df6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -100,7 +100,7 @@ export type AssetResponse = {
  * Request body for setting an asset state value.
  */
 export type AssetStateBody = {
-    value: string;
+    value: JsonValue;
 };
 
 /**
@@ -116,7 +116,7 @@ export type AssetStateCollectionResponse = {
  */
 export type AssetStateResponse = {
     key: string;
-    value: string;
+    value: JsonValue;
     updated_at: string;
 };
 
@@ -1715,7 +1715,7 @@ export type TaskResponse = {
  * Request body for setting a task state value.
  */
 export type TaskStateBody = {
-    value: string;
+    value: JsonValue;
 };
 
 /**
@@ -1731,7 +1731,7 @@ export type TaskStateCollectionResponse = {
  */
 export type TaskStateResponse = {
     key: string;
-    value: string;
+    value: JsonValue;
     updated_at: string;
     expires_at: string | null;
 };
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
index c53fbb99ee9..4615b9fc785 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
@@ -16,8 +16,13 @@
 # under the License.
 from __future__ import annotations
 
+import json
+
 import pytest
+from pydantic import ValidationError
+from sqlalchemy import select
 
+from airflow.api_fastapi.core_api.datamodels.asset_state import AssetStateBody
 from airflow.models.asset import AssetModel
 from airflow.models.asset_state import AssetStateModel
 
@@ -37,7 +42,7 @@ def _create_asset(session) -> AssetModel:
 
 
 def _create_asset_state(session, asset_id: int, key: str, value: str) -> None:
-    row = AssetStateModel(asset_id=asset_id, key=key, value=value)
+    row = AssetStateModel(asset_id=asset_id, key=key, value=json.dumps(value))
     session.add(row)
     session.flush()
 
@@ -172,9 +177,50 @@ class TestSetAssetState(TestAssetStateEndpoint):
     def test_empty_body_returns_422(self, test_client):
         assert test_client.put(f"{self._base_url}/watermark", 
json={}).status_code == 422
 
+    def test_null_value_returns_422(self, test_client):
+        assert test_client.put(f"{self._base_url}/watermark", json={"value": 
None}).status_code == 422
+
     def test_oversized_value_returns_422(self, test_client):
         assert test_client.put(f"{self._base_url}/watermark", json={"value": 
"x" * 65536}).status_code == 422
 
+    @pytest.mark.parametrize("bad_value", [float("nan"), float("inf"), {"a": 
float("nan")}, [float("inf")]])
+    def test_non_finite_float_rejected_by_validator(self, bad_value):
+        with pytest.raises(ValidationError, match="non-finite"):
+            AssetStateBody(value=bad_value)
+
+    @pytest.mark.parametrize(
+        ("value", "expected_db"),
+        [
+            (42, "42"),
+            ("hello", '"hello"'),
+            ({"k": 1}, '{"k": 1}'),
+            ([1, 2], "[1, 2]"),
+        ],
+    )
+    def test_put_stores_json_encoded_value(self, test_client, value, 
expected_db):
+        test_client.put(f"{self._base_url}/k", json={"value": value})
+        row = self._session.scalar(
+            select(AssetStateModel).where(
+                AssetStateModel.asset_id == self.asset.id,
+                AssetStateModel.key == "k",
+            )
+        )
+        assert row is not None
+        assert row.value == expected_db
+
+    @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"], 
"hello"])
+    def test_core_api_write_read_roundtrip(self, test_client, value):
+        """Core API write then Core API read returns the same native value."""
+        test_client.put(f"{self._base_url}/k", json={"value": value})
+        assert test_client.get(f"{self._base_url}/k").json()["value"] == value
+
+    @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"], 
"hello"])
+    def test_worker_write_core_api_read_roundtrip(self, test_client, value):
+        """Worker write (json.dumps in DB) then Core API read returns native 
value."""
+        _create_asset_state(self._session, self.asset.id, "k", value)
+        self._session.commit()
+        assert test_client.get(f"{self._base_url}/k").json()["value"] == value
+
     def test_key_with_slash_is_supported(self, test_client):
         response = test_client.put(f"{self._base_url}/partition/date", 
json={"value": "2026-05-01"})
         assert response.status_code == 204
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
index c53212fa5e4..e96aca22cdd 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
@@ -16,10 +16,14 @@
 # under the License.
 from __future__ import annotations
 
+import json
+
 import pytest
+from pydantic import ValidationError
 from sqlalchemy import select
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.core_api.datamodels.task_state import TaskStateBody
 from airflow.models.dagrun import DagRun
 from airflow.models.task_state import TaskStateModel
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -54,7 +58,7 @@ def _create_task_state(session, key: str, value: str, 
dag_run: DagRun) -> None:
         task_id=TASK_ID,
         map_index=-1,
         key=key,
-        value=value,
+        value=json.dumps(value),
     )
     session.add(row)
     session.flush()
@@ -114,7 +118,7 @@ class TestListTaskState(TestTaskStateEndpoint):
             task_id=TASK_ID,
             map_index=0,
             key="job_id",
-            value="mapped_app",
+            value=json.dumps("mapped_app"),
         )
         self._session.add(row)
         self._session.commit()
@@ -191,9 +195,17 @@ class TestSetTaskState(TestTaskStateEndpoint):
     def test_empty_body_returns_422(self, test_client):
         assert test_client.put(f"{BASE_URL}/job_id", json={}).status_code == 
422
 
+    def test_null_value_returns_422(self, test_client):
+        assert test_client.put(f"{BASE_URL}/job_id", json={"value": 
None}).status_code == 422
+
     def test_oversized_value_returns_422(self, test_client):
         assert test_client.put(f"{BASE_URL}/job_id", json={"value": "x" * 
65536}).status_code == 422
 
+    @pytest.mark.parametrize("bad_value", [float("nan"), float("inf"), {"a": 
float("nan")}, [float("inf")]])
+    def test_non_finite_float_rejected_by_validator(self, bad_value):
+        with pytest.raises(ValidationError, match="non-finite"):
+            TaskStateBody(value=bad_value)
+
     def test_set_nonexistent_dag_run_returns_404(self, test_client):
         """set() raises ValueError when DagRun doesn't exist — should surface 
as 404."""
         bad_url = 
f"/dags/{DAG_ID}/dagRuns/nonexistent_run/taskInstances/{TASK_ID}/states/job_id"
@@ -206,6 +218,41 @@ class TestSetTaskState(TestTaskStateEndpoint):
         response = test_client.put(bad_url, json={"value": "v"})
         assert response.status_code == 404
 
+    @pytest.mark.parametrize(
+        ("value", "expected_db"),
+        [
+            (42, "42"),
+            ("hello", '"hello"'),
+            ({"k": 1}, '{"k": 1}'),
+            ([1, 2], "[1, 2]"),
+        ],
+    )
+    def test_put_stores_json_encoded_value(self, test_client, value, 
expected_db):
+        test_client.put(f"{BASE_URL}/k", json={"value": value})
+        row = self._session.scalar(
+            select(TaskStateModel).where(
+                TaskStateModel.dag_id == DAG_ID,
+                TaskStateModel.run_id == RUN_ID,
+                TaskStateModel.task_id == TASK_ID,
+                TaskStateModel.key == "k",
+            )
+        )
+        assert row is not None
+        assert row.value == expected_db
+
+    @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"], 
"hello"])
+    def test_core_api_write_read_roundtrip(self, test_client, value):
+        """Core API write then Core API read returns the same native value."""
+        test_client.put(f"{BASE_URL}/k", json={"value": value})
+        assert test_client.get(f"{BASE_URL}/k").json()["value"] == value
+
+    @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"], 
"hello"])
+    def test_worker_write_core_api_read_roundtrip(self, test_client, value):
+        """Worker write (json.dumps in DB) then Core API read returns native 
value."""
+        _create_task_state(self._session, "k", value, self.dag_run)
+        self._session.commit()
+        assert test_client.get(f"{BASE_URL}/k").json()["value"] == value
+
     def test_key_with_slash_is_supported(self, test_client):
         response = test_client.put(f"{BASE_URL}/workflow/step_1", 
json={"value": "v"})
         assert response.status_code == 204
@@ -266,7 +313,7 @@ class TestClearTaskState(TestTaskStateEndpoint):
                 task_id=TASK_ID,
                 map_index=map_index,
                 key="job_id",
-                value=f"app_{map_index}",
+                value=json.dumps(f"app_{map_index}"),
             )
             self._session.add(row)
         self._session.commit()
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index f05fa65cf56..44ac8d39387 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -49,27 +49,6 @@ class AssetAliasResponse(BaseModel):
     group: Annotated[str, Field(title="Group")]
 
 
-class AssetStateBody(BaseModel):
-    """
-    Request body for setting an asset state value.
-    """
-
-    model_config = ConfigDict(
-        extra="forbid",
-    )
-    value: Annotated[str, Field(max_length=65535, title="Value")]
-
-
-class AssetStateResponse(BaseModel):
-    """
-    A single asset state key/value pair with metadata.
-    """
-
-    key: Annotated[str, Field(title="Key")]
-    value: Annotated[str, Field(title="Value")]
-    updated_at: Annotated[datetime, Field(title="Updated At")]
-
-
 class AssetWatcherResponse(BaseModel):
     """
     Asset watcher serializer for responses.
@@ -981,7 +960,7 @@ class TaskStateBody(BaseModel):
     model_config = ConfigDict(
         extra="forbid",
     )
-    value: Annotated[str, Field(max_length=65535, title="Value")]
+    value: JsonValue
 
 
 class TaskStateResponse(BaseModel):
@@ -990,7 +969,7 @@ class TaskStateResponse(BaseModel):
     """
 
     key: Annotated[str, Field(title="Key")]
-    value: Annotated[str, Field(title="Value")]
+    value: JsonValue
     updated_at: Annotated[datetime, Field(title="Updated At")]
     expires_at: Annotated[datetime | None, Field(title="Expires At")] = None
 
@@ -1225,13 +1204,25 @@ class AssetResponse(BaseModel):
     last_asset_event: LastAssetEventResponse | None = None
 
 
-class AssetStateCollectionResponse(BaseModel):
+class AssetStateBody(BaseModel):
     """
-    All asset state entries for an asset.
+    Request body for setting an asset state value.
     """
 
-    asset_states: Annotated[list[AssetStateResponse], Field(title="Asset 
States")]
-    total_entries: Annotated[int, Field(title="Total Entries")]
+    model_config = ConfigDict(
+        extra="forbid",
+    )
+    value: JsonValue
+
+
+class AssetStateResponse(BaseModel):
+    """
+    A single asset state key/value pair with metadata.
+    """
+
+    key: Annotated[str, Field(title="Key")]
+    value: JsonValue
+    updated_at: Annotated[datetime, Field(title="Updated At")]
 
 
 class BackfillPostBody(BaseModel):
@@ -2017,6 +2008,15 @@ class AssetEventCollectionResponse(BaseModel):
     total_entries: Annotated[int, Field(title="Total Entries")]
 
 
+class AssetStateCollectionResponse(BaseModel):
+    """
+    All asset state entries for an asset.
+    """
+
+    asset_states: Annotated[list[AssetStateResponse], Field(title="Asset 
States")]
+    total_entries: Annotated[int, Field(title="Total Entries")]
+
+
 class BackfillCollectionResponse(BaseModel):
     """
     Backfill Collection serializer for responses.

Reply via email to