This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c1299bd3437 Unify task/asset state storage between Core API and
Execution API (#67547)
c1299bd3437 is described below
commit c1299bd3437d356dac0821a6ea2dac4d1e3f8e5e
Author: Amogh Desai <[email protected]>
AuthorDate: Fri May 29 12:11:25 2026 +0530
Unify task/asset state storage between Core API and Execution API (#67547)
---
.../api_fastapi/core_api/datamodels/asset_state.py | 22 +++++++--
.../api_fastapi/core_api/datamodels/task_state.py | 22 +++++++--
.../core_api/openapi/v2-rest-api-generated.yaml | 14 ++----
.../core_api/routes/public/asset_state.py | 9 ++--
.../core_api/routes/public/task_state.py | 9 ++--
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 14 ++----
.../airflow/ui/openapi-gen/requests/types.gen.ts | 8 ++--
.../core_api/routes/public/test_asset_state.py | 48 ++++++++++++++++++-
.../core_api/routes/public/test_task_state.py | 53 +++++++++++++++++++--
.../src/airflowctl/api/datamodels/generated.py | 54 +++++++++++-----------
10 files changed, 186 insertions(+), 67 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
index 379d7802909..a373fe89d2d 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/asset_state.py
@@ -16,18 +16,21 @@
# under the License.
from __future__ import annotations
+import json
from datetime import datetime
-from pydantic import Field
+from pydantic import JsonValue, field_validator
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+_MAX_SERIALIZED_BYTES = 65535
+
class AssetStateResponse(BaseModel):
"""A single asset state key/value pair with metadata."""
key: str
- value: str
+ value: JsonValue
updated_at: datetime
@@ -41,4 +44,17 @@ class AssetStateCollectionResponse(BaseModel):
class AssetStateBody(StrictBaseModel):
"""Request body for setting an asset state value."""
- value: str = Field(max_length=65535)
+ value: JsonValue
+
+ @field_validator("value")
+ @classmethod
+ def value_is_json_representable(cls, v: JsonValue) -> JsonValue:
+ if v is None:
+ raise ValueError("value cannot be null")
+ try:
+ serialized = json.dumps(v, allow_nan=False)
+ except ValueError:
+ raise ValueError("value contains non-finite numbers; NaN and Inf
are not JSON representable")
+ if len(serialized) > _MAX_SERIALIZED_BYTES:
+ raise ValueError(f"value exceeds maximum serialized size of
{_MAX_SERIALIZED_BYTES} bytes")
+ return v
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
index 856de74a087..e6622f842e1 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_state.py
@@ -16,18 +16,21 @@
# under the License.
from __future__ import annotations
+import json
from datetime import datetime
-from pydantic import Field
+from pydantic import JsonValue, field_validator
from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
+_MAX_SERIALIZED_BYTES = 65535
+
class TaskStateResponse(BaseModel):
"""A single task state key/value pair with metadata."""
key: str
- value: str
+ value: JsonValue
updated_at: datetime
expires_at: datetime | None
@@ -42,4 +45,17 @@ class TaskStateCollectionResponse(BaseModel):
class TaskStateBody(StrictBaseModel):
"""Request body for setting a task state value."""
- value: str = Field(max_length=65535)
+ value: JsonValue
+
+ @field_validator("value")
+ @classmethod
+ def value_is_json_representable(cls, v: JsonValue) -> JsonValue:
+ if v is None:
+ raise ValueError("value cannot be null")
+ try:
+ serialized = json.dumps(v, allow_nan=False)
+ except ValueError:
+ raise ValueError("value contains non-finite numbers; NaN and Inf
are not JSON representable")
+ if len(serialized) > _MAX_SERIALIZED_BYTES:
+ raise ValueError(f"value exceeds maximum serialized size of
{_MAX_SERIALIZED_BYTES} bytes")
+ return v
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 618356a1ce7..df21d7570c5 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11360,9 +11360,7 @@ components:
AssetStateBody:
properties:
value:
- type: string
- maxLength: 65535
- title: Value
+ $ref: '#/components/schemas/JsonValue'
additionalProperties: false
type: object
required:
@@ -11391,8 +11389,7 @@ components:
type: string
title: Key
value:
- type: string
- title: Value
+ $ref: '#/components/schemas/JsonValue'
updated_at:
type: string
format: date-time
@@ -15810,9 +15807,7 @@ components:
TaskStateBody:
properties:
value:
- type: string
- maxLength: 65535
- title: Value
+ $ref: '#/components/schemas/JsonValue'
additionalProperties: false
type: object
required:
@@ -15841,8 +15836,7 @@ components:
type: string
title: Key
value:
- type: string
- title: Value
+ $ref: '#/components/schemas/JsonValue'
updated_at:
type: string
format: date-time
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
index 43580e0c762..877bd5f8f44 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/asset_state.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import json
from typing import Annotated
from fastapi import Depends, HTTPException, status
@@ -85,7 +86,9 @@ def list_asset_states(
session=session,
)
rows = session.execute(paginated).all()
- entries = [AssetStateResponse(key=r.key, value=r.value,
updated_at=r.updated_at) for r in rows]
+ entries = [
+ AssetStateResponse(key=r.key, value=json.loads(r.value),
updated_at=r.updated_at) for r in rows
+ ]
return AssetStateCollectionResponse(asset_states=entries,
total_entries=total_entries)
@@ -115,7 +118,7 @@ def get_asset_state(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Asset state key {key!r} not found",
)
- return AssetStateResponse(key=row.key, value=row.value,
updated_at=row.updated_at)
+ return AssetStateResponse(key=row.key, value=json.loads(row.value),
updated_at=row.updated_at)
@asset_state_router.put(
@@ -131,7 +134,7 @@ def set_asset_state(
session: SessionDep,
) -> None:
"""Set an asset state value. Creates or overwrites the key."""
- get_state_backend().set(AssetScope(asset_id=asset_id), key, body.value,
session=session)
+ get_state_backend().set(AssetScope(asset_id=asset_id), key,
json.dumps(body.value), session=session)
@asset_state_router.delete(
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
index 138380232a8..31cc7272ddc 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state.py
@@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations
+import json
from typing import Annotated
from fastapi import Depends, HTTPException, Query, status
@@ -87,7 +88,9 @@ def list_task_states(
)
rows = session.execute(paginated).all()
entries = [
- TaskStateResponse(key=r.key, value=r.value, updated_at=r.updated_at,
expires_at=r.expires_at)
+ TaskStateResponse(
+ key=r.key, value=json.loads(r.value), updated_at=r.updated_at,
expires_at=r.expires_at
+ )
for r in rows
]
return TaskStateCollectionResponse(task_states=entries,
total_entries=total_entries)
@@ -127,7 +130,7 @@ def get_task_state(
detail=f"Task state key {key!r} not found",
)
return TaskStateResponse(
- key=row.key, value=row.value, updated_at=row.updated_at,
expires_at=row.expires_at
+ key=row.key, value=json.loads(row.value), updated_at=row.updated_at,
expires_at=row.expires_at
)
@@ -162,7 +165,7 @@ def set_task_state(
)
scope = _get_scope(dag_id, dag_run_id, task_id, map_index)
try:
- get_state_backend().set(scope, key, body.value, session=session)
+ get_state_backend().set(scope, key, json.dumps(body.value),
session=session)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=str(e)) from e
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index f14d1a07f3d..e03a9e6bada 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -384,9 +384,7 @@ export const $AssetResponse = {
export const $AssetStateBody = {
properties: {
value: {
- type: 'string',
- maxLength: 65535,
- title: 'Value'
+ '$ref': '#/components/schemas/JsonValue'
}
},
additionalProperties: false,
@@ -423,8 +421,7 @@ export const $AssetStateResponse = {
title: 'Key'
},
value: {
- type: 'string',
- title: 'Value'
+ '$ref': '#/components/schemas/JsonValue'
},
updated_at: {
type: 'string',
@@ -6982,9 +6979,7 @@ export const $TaskResponse = {
export const $TaskStateBody = {
properties: {
value: {
- type: 'string',
- maxLength: 65535,
- title: 'Value'
+ '$ref': '#/components/schemas/JsonValue'
}
},
additionalProperties: false,
@@ -7021,8 +7016,7 @@ export const $TaskStateResponse = {
title: 'Key'
},
value: {
- type: 'string',
- title: 'Value'
+ '$ref': '#/components/schemas/JsonValue'
},
updated_at: {
type: 'string',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 7c2d5c0ca99..85a37588df6 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -100,7 +100,7 @@ export type AssetResponse = {
* Request body for setting an asset state value.
*/
export type AssetStateBody = {
- value: string;
+ value: JsonValue;
};
/**
@@ -116,7 +116,7 @@ export type AssetStateCollectionResponse = {
*/
export type AssetStateResponse = {
key: string;
- value: string;
+ value: JsonValue;
updated_at: string;
};
@@ -1715,7 +1715,7 @@ export type TaskResponse = {
* Request body for setting a task state value.
*/
export type TaskStateBody = {
- value: string;
+ value: JsonValue;
};
/**
@@ -1731,7 +1731,7 @@ export type TaskStateCollectionResponse = {
*/
export type TaskStateResponse = {
key: string;
- value: string;
+ value: JsonValue;
updated_at: string;
expires_at: string | null;
};
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
index c53fbb99ee9..4615b9fc785 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_asset_state.py
@@ -16,8 +16,13 @@
# under the License.
from __future__ import annotations
+import json
+
import pytest
+from pydantic import ValidationError
+from sqlalchemy import select
+from airflow.api_fastapi.core_api.datamodels.asset_state import AssetStateBody
from airflow.models.asset import AssetModel
from airflow.models.asset_state import AssetStateModel
@@ -37,7 +42,7 @@ def _create_asset(session) -> AssetModel:
def _create_asset_state(session, asset_id: int, key: str, value: str) -> None:
- row = AssetStateModel(asset_id=asset_id, key=key, value=value)
+ row = AssetStateModel(asset_id=asset_id, key=key, value=json.dumps(value))
session.add(row)
session.flush()
@@ -172,9 +177,50 @@ class TestSetAssetState(TestAssetStateEndpoint):
def test_empty_body_returns_422(self, test_client):
assert test_client.put(f"{self._base_url}/watermark",
json={}).status_code == 422
+ def test_null_value_returns_422(self, test_client):
+ assert test_client.put(f"{self._base_url}/watermark", json={"value":
None}).status_code == 422
+
def test_oversized_value_returns_422(self, test_client):
assert test_client.put(f"{self._base_url}/watermark", json={"value":
"x" * 65536}).status_code == 422
+ @pytest.mark.parametrize("bad_value", [float("nan"), float("inf"), {"a":
float("nan")}, [float("inf")]])
+ def test_non_finite_float_rejected_by_validator(self, bad_value):
+ with pytest.raises(ValidationError, match="non-finite"):
+ AssetStateBody(value=bad_value)
+
+ @pytest.mark.parametrize(
+ ("value", "expected_db"),
+ [
+ (42, "42"),
+ ("hello", '"hello"'),
+ ({"k": 1}, '{"k": 1}'),
+ ([1, 2], "[1, 2]"),
+ ],
+ )
+ def test_put_stores_json_encoded_value(self, test_client, value,
expected_db):
+ test_client.put(f"{self._base_url}/k", json={"value": value})
+ row = self._session.scalar(
+ select(AssetStateModel).where(
+ AssetStateModel.asset_id == self.asset.id,
+ AssetStateModel.key == "k",
+ )
+ )
+ assert row is not None
+ assert row.value == expected_db
+
+ @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"],
"hello"])
+ def test_core_api_write_read_roundtrip(self, test_client, value):
+ """Core API write then Core API read returns the same native value."""
+ test_client.put(f"{self._base_url}/k", json={"value": value})
+ assert test_client.get(f"{self._base_url}/k").json()["value"] == value
+
+ @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"],
"hello"])
+ def test_worker_write_core_api_read_roundtrip(self, test_client, value):
+ """Worker write (json.dumps in DB) then Core API read returns native
value."""
+ _create_asset_state(self._session, self.asset.id, "k", value)
+ self._session.commit()
+ assert test_client.get(f"{self._base_url}/k").json()["value"] == value
+
def test_key_with_slash_is_supported(self, test_client):
response = test_client.put(f"{self._base_url}/partition/date",
json={"value": "2026-05-01"})
assert response.status_code == 204
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
index c53212fa5e4..e96aca22cdd 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_state.py
@@ -16,10 +16,14 @@
# under the License.
from __future__ import annotations
+import json
+
import pytest
+from pydantic import ValidationError
from sqlalchemy import select
from airflow._shared.timezones import timezone
+from airflow.api_fastapi.core_api.datamodels.task_state import TaskStateBody
from airflow.models.dagrun import DagRun
from airflow.models.task_state import TaskStateModel
from airflow.providers.standard.operators.empty import EmptyOperator
@@ -54,7 +58,7 @@ def _create_task_state(session, key: str, value: str,
dag_run: DagRun) -> None:
task_id=TASK_ID,
map_index=-1,
key=key,
- value=value,
+ value=json.dumps(value),
)
session.add(row)
session.flush()
@@ -114,7 +118,7 @@ class TestListTaskState(TestTaskStateEndpoint):
task_id=TASK_ID,
map_index=0,
key="job_id",
- value="mapped_app",
+ value=json.dumps("mapped_app"),
)
self._session.add(row)
self._session.commit()
@@ -191,9 +195,17 @@ class TestSetTaskState(TestTaskStateEndpoint):
def test_empty_body_returns_422(self, test_client):
assert test_client.put(f"{BASE_URL}/job_id", json={}).status_code ==
422
+ def test_null_value_returns_422(self, test_client):
+ assert test_client.put(f"{BASE_URL}/job_id", json={"value":
None}).status_code == 422
+
def test_oversized_value_returns_422(self, test_client):
assert test_client.put(f"{BASE_URL}/job_id", json={"value": "x" *
65536}).status_code == 422
+ @pytest.mark.parametrize("bad_value", [float("nan"), float("inf"), {"a":
float("nan")}, [float("inf")]])
+ def test_non_finite_float_rejected_by_validator(self, bad_value):
+ with pytest.raises(ValidationError, match="non-finite"):
+ TaskStateBody(value=bad_value)
+
def test_set_nonexistent_dag_run_returns_404(self, test_client):
"""set() raises ValueError when DagRun doesn't exist — should surface
as 404."""
bad_url =
f"/dags/{DAG_ID}/dagRuns/nonexistent_run/taskInstances/{TASK_ID}/states/job_id"
@@ -206,6 +218,41 @@ class TestSetTaskState(TestTaskStateEndpoint):
response = test_client.put(bad_url, json={"value": "v"})
assert response.status_code == 404
+ @pytest.mark.parametrize(
+ ("value", "expected_db"),
+ [
+ (42, "42"),
+ ("hello", '"hello"'),
+ ({"k": 1}, '{"k": 1}'),
+ ([1, 2], "[1, 2]"),
+ ],
+ )
+ def test_put_stores_json_encoded_value(self, test_client, value,
expected_db):
+ test_client.put(f"{BASE_URL}/k", json={"value": value})
+ row = self._session.scalar(
+ select(TaskStateModel).where(
+ TaskStateModel.dag_id == DAG_ID,
+ TaskStateModel.run_id == RUN_ID,
+ TaskStateModel.task_id == TASK_ID,
+ TaskStateModel.key == "k",
+ )
+ )
+ assert row is not None
+ assert row.value == expected_db
+
+ @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"],
"hello"])
+ def test_core_api_write_read_roundtrip(self, test_client, value):
+ """Core API write then Core API read returns the same native value."""
+ test_client.put(f"{BASE_URL}/k", json={"value": value})
+ assert test_client.get(f"{BASE_URL}/k").json()["value"] == value
+
+ @pytest.mark.parametrize("value", [42, True, {"rows": 100}, [1, "two"],
"hello"])
+ def test_worker_write_core_api_read_roundtrip(self, test_client, value):
+ """Worker write (json.dumps in DB) then Core API read returns native
value."""
+ _create_task_state(self._session, "k", value, self.dag_run)
+ self._session.commit()
+ assert test_client.get(f"{BASE_URL}/k").json()["value"] == value
+
def test_key_with_slash_is_supported(self, test_client):
response = test_client.put(f"{BASE_URL}/workflow/step_1",
json={"value": "v"})
assert response.status_code == 204
@@ -266,7 +313,7 @@ class TestClearTaskState(TestTaskStateEndpoint):
task_id=TASK_ID,
map_index=map_index,
key="job_id",
- value=f"app_{map_index}",
+ value=json.dumps(f"app_{map_index}"),
)
self._session.add(row)
self._session.commit()
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index f05fa65cf56..44ac8d39387 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -49,27 +49,6 @@ class AssetAliasResponse(BaseModel):
group: Annotated[str, Field(title="Group")]
-class AssetStateBody(BaseModel):
- """
- Request body for setting an asset state value.
- """
-
- model_config = ConfigDict(
- extra="forbid",
- )
- value: Annotated[str, Field(max_length=65535, title="Value")]
-
-
-class AssetStateResponse(BaseModel):
- """
- A single asset state key/value pair with metadata.
- """
-
- key: Annotated[str, Field(title="Key")]
- value: Annotated[str, Field(title="Value")]
- updated_at: Annotated[datetime, Field(title="Updated At")]
-
-
class AssetWatcherResponse(BaseModel):
"""
Asset watcher serializer for responses.
@@ -981,7 +960,7 @@ class TaskStateBody(BaseModel):
model_config = ConfigDict(
extra="forbid",
)
- value: Annotated[str, Field(max_length=65535, title="Value")]
+ value: JsonValue
class TaskStateResponse(BaseModel):
@@ -990,7 +969,7 @@ class TaskStateResponse(BaseModel):
"""
key: Annotated[str, Field(title="Key")]
- value: Annotated[str, Field(title="Value")]
+ value: JsonValue
updated_at: Annotated[datetime, Field(title="Updated At")]
expires_at: Annotated[datetime | None, Field(title="Expires At")] = None
@@ -1225,13 +1204,25 @@ class AssetResponse(BaseModel):
last_asset_event: LastAssetEventResponse | None = None
-class AssetStateCollectionResponse(BaseModel):
+class AssetStateBody(BaseModel):
"""
- All asset state entries for an asset.
+ Request body for setting an asset state value.
"""
- asset_states: Annotated[list[AssetStateResponse], Field(title="Asset
States")]
- total_entries: Annotated[int, Field(title="Total Entries")]
+ model_config = ConfigDict(
+ extra="forbid",
+ )
+ value: JsonValue
+
+
+class AssetStateResponse(BaseModel):
+ """
+ A single asset state key/value pair with metadata.
+ """
+
+ key: Annotated[str, Field(title="Key")]
+ value: JsonValue
+ updated_at: Annotated[datetime, Field(title="Updated At")]
class BackfillPostBody(BaseModel):
@@ -2017,6 +2008,15 @@ class AssetEventCollectionResponse(BaseModel):
total_entries: Annotated[int, Field(title="Total Entries")]
+class AssetStateCollectionResponse(BaseModel):
+ """
+ All asset state entries for an asset.
+ """
+
+ asset_states: Annotated[list[AssetStateResponse], Field(title="Asset
States")]
+ total_entries: Annotated[int, Field(title="Total Entries")]
+
+
class BackfillCollectionResponse(BaseModel):
"""
Backfill Collection serializer for responses.