This is an automated email from the ASF dual-hosted git repository.
amoghrajesh pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new b311e6fb453 Pass task/asset scopes to serialize methods instead of
ti_id/asset_ref (#68274)
b311e6fb453 is described below
commit b311e6fb453a644d9d9c0e55f248786815f999d1
Author: Amogh Desai <[email protected]>
AuthorDate: Wed Jun 10 08:48:14 2026 +0530
Pass task/asset scopes to serialize methods instead of ti_id/asset_ref
(#68274)
---
.../task-and-asset-store.rst | 21 ++++++------
shared/state/src/airflow_shared/state/__init__.py | 11 +++---
shared/state/tests/state/test_state.py | 40 ++++++++++++----------
task-sdk/src/airflow/sdk/execution_time/context.py | 7 ++--
.../tests/task_sdk/execution_time/test_context.py | 14 ++++----
.../task_sdk/execution_time/test_task_runner.py | 14 +++++---
6 files changed, 59 insertions(+), 48 deletions(-)
diff --git
a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst
b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst
index 5cf54115e46..6e261a266dc 100644
--- a/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst
+++ b/airflow-core/docs/administration-and-deployment/task-and-asset-store.rst
@@ -157,12 +157,12 @@ Override four serialization hooks from
:class:`~airflow.sdk.state.BaseStoreBacke
* ``serialize_task_store_to_ref``: called by ``TaskStoreAccessor.set()``
before the value is sent to the Execution API; return a compact reference
string (e.g. an S3 key) to be stored in the database instead of the raw value.
* ``deserialize_task_store_from_ref``: called by ``TaskStoreAccessor.get()``
after retrieving the reference from the backend; return the actual value.
-* ``serialize_asset_store_to_ref``: same as the task variant but for asset
store; receives the asset name or URI as ``asset_ref``.
+* ``serialize_asset_store_to_ref``: same as the task variant but for asset
store; receives the asset scope as ``scope`` (an
:class:`~airflow.sdk.state.AssetScope` with ``name`` and/or ``uri``).
* ``deserialize_asset_store_from_ref``: called by ``AssetStoreAccessor.get()``
to resolve the stored reference back to the actual value.
.. important::
- **References must be deterministic.** Given the same inputs (``ti_id`` +
``key`` for task store; ``asset_ref`` + ``key`` for asset store), the
serialization method must always return the same reference string. Do not embed
timestamps, random UUIDs, or any other non-deterministic component in the
reference path.
+ **References must be deterministic.** Given the same inputs (``scope`` +
``key``), the serialization method must always return the same reference
string. Do not embed timestamps, random UUIDs, or any other non-deterministic
component in the reference path.
When a key is deleted or cleared, Airflow clears the database reference
*first*, then calls the backend's ``delete()`` or ``clear()`` method. If
backend cleanup fails after the DB row is gone, the external object is
orphaned. Because the reference is deterministic, a subsequent ``set()`` for
the same key will overwrite the orphaned object, making the failure
recoverable. A non-deterministic reference would leave the external object
permanently orphaned with no way to locate it.
@@ -178,17 +178,18 @@ Example skeleton:
class S3StateBackend(BaseStoreBackend):
- def _task_ref(self, ti_id: str, key: str) -> str:
- return f"airflow/task-store/{ti_id}/{key}"
+ def _task_ref(self, scope: TaskScope, key: str) -> str:
+ return
f"airflow/task-store/{scope.dag_id}/{scope.run_id}/{scope.task_id}/{scope.map_index}/{key}"
- def _asset_ref(self, asset_ref: str, key: str) -> str:
+ def _asset_ref(self, scope: AssetScope, key: str) -> str:
import hashlib
- safe = hashlib.sha256(asset_ref.encode()).hexdigest()[:16]
+ asset_identifier = scope.name or scope.uri or ""
+ safe = hashlib.sha256(asset_identifier.encode()).hexdigest()[:16]
return f"airflow/asset-store/{safe}/{key}"
- def serialize_task_store_to_ref(self, *, value: JsonValue, key: str,
ti_id: str) -> str:
- s3_key = self._task_ref(ti_id, key)
+ def serialize_task_store_to_ref(self, *, value: JsonValue, key: str,
scope: TaskScope) -> str:
+ s3_key = self._task_ref(scope, key)
s3_client.put_object(Bucket=BUCKET, Key=s3_key,
Body=json.dumps(value).encode())
return s3_key
@@ -196,8 +197,8 @@ Example skeleton:
s3_object = s3_client.get_object(Bucket=BUCKET, Key=stored)
return json.loads(s3_object["Body"].read().decode())
- def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str,
asset_ref: str) -> str:
- s3_key = self._asset_ref(asset_ref, key)
+ def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str,
scope: AssetScope) -> str:
+ s3_key = self._asset_ref(scope, key)
s3_client.put_object(Bucket=BUCKET, Key=s3_key,
Body=json.dumps(value).encode())
return s3_key
diff --git a/shared/state/src/airflow_shared/state/__init__.py
b/shared/state/src/airflow_shared/state/__init__.py
index 3c963acd18d..cd2c4b3172e 100644
--- a/shared/state/src/airflow_shared/state/__init__.py
+++ b/shared/state/src/airflow_shared/state/__init__.py
@@ -247,7 +247,7 @@ class BaseStoreBackend(ABC):
``[state_store] default_retention_days``) and deciding what to delete.
"""
- def serialize_task_store_to_ref(self, *, value: JsonValue, key: str,
ti_id: str) -> str:
+ def serialize_task_store_to_ref(self, *, value: JsonValue, key: str,
scope: TaskScope) -> str:
"""
Serialize a task store value before it is sent to the execution API
for db persistence.
@@ -260,7 +260,7 @@ class BaseStoreBackend(ABC):
that wrapper before passing ``stored`` to
``deserialize_task_store_from_ref()``. Do not
wrap the reference yourself.
- The returned reference must be deterministic — given the same
``ti_id`` and ``key`` it
+ The returned reference must be deterministic — given the same
``scope`` and ``key`` it
must always return the same string. Do not use timestamps or random
UUIDs as part of
the reference, otherwise ``delete()``/``clear()`` cannot reconstruct
it and the external
object will be orphaned. By default, it JSON dumps the value and
returns a JSON string.
@@ -277,7 +277,7 @@ class BaseStoreBackend(ABC):
"""
return json.loads(stored)
- def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str,
asset_ref: str) -> str:
+ def serialize_asset_store_to_ref(self, *, value: JsonValue, key: str,
scope: AssetScope) -> str:
"""
Serialize an asset store value before it is sent to the Execution API
for db persistence.
@@ -290,10 +290,7 @@ class BaseStoreBackend(ABC):
that wrapper before passing ``stored`` to
``deserialize_asset_store_from_ref()``. Do not
wrap the reference yourself.
- ``asset_ref`` is either the asset name or URI, depending on how the
accessor was
- constructed. It may be a URI string if the task inlet was declared as
``AssetUriRef``.
-
- The returned reference must be deterministic — given the same
``asset_ref`` and ``key`` it
+ The returned reference must be deterministic — given the same
``scope`` and ``key`` it
must always return the same string. Do not use timestamps or random
UUIDs as part of
the reference, otherwise ``delete()``/``clear()`` cannot reconstruct
it and the external
object will be orphaned. By default, it JSON dumps the value and
returns a JSON string.
diff --git a/shared/state/tests/state/test_state.py
b/shared/state/tests/state/test_state.py
index b527b4fc6a1..08d6c74a55f 100644
--- a/shared/state/tests/state/test_state.py
+++ b/shared/state/tests/state/test_state.py
@@ -18,7 +18,7 @@ from __future__ import annotations
import pytest
-from airflow_shared.state import AssetScope, BaseStoreBackend, StoreScope
+from airflow_shared.state import AssetScope, BaseStoreBackend, StoreScope,
TaskScope
class TestAssetScope:
@@ -88,20 +88,22 @@ class TestBaseStoreBackend:
def test_task_store_serialize_deserialize_round_trip(self, backend):
original = "app_1234"
- serialized = backend.serialize_task_store_to_ref(value=original,
key="job_id", ti_id="abc-123")
+ scope = TaskScope(dag_id="d", run_id="r", task_id="t", map_index=-1)
+ serialized = backend.serialize_task_store_to_ref(value=original,
key="job_id", scope=scope)
deserialized = backend.deserialize_task_store_from_ref(serialized)
assert deserialized == original
def test_task_store_serialize_deserialize_typed_values(self, backend):
"""Default backend passes typed values through unchanged (custom
backends handle storage)."""
+ scope = TaskScope(dag_id="d", run_id="r", task_id="t", map_index=-1)
assert (
backend.deserialize_task_store_from_ref(
- backend.serialize_task_store_to_ref(value=42, key="count",
ti_id="abc-123")
+ backend.serialize_task_store_to_ref(value=42, key="count",
scope=scope)
)
== 42
)
assert backend.deserialize_task_store_from_ref(
- backend.serialize_task_store_to_ref(value={"status": "ok"},
key="result", ti_id="abc-123")
+ backend.serialize_task_store_to_ref(value={"status": "ok"},
key="result", scope=scope)
) == {"status": "ok"}
def test_custom_backend_overrides_task_store_ser_deser(self):
@@ -115,38 +117,39 @@ class TestBaseStoreBackend:
async def adelete(self, scope, key): ...
async def aclear(self, scope, *, all_map_indices=False): ...
- def serialize_task_store_to_ref(self, *, value, key, ti_id):
- return f"s3://bucket/{ti_id}/{key}"
+ def serialize_task_store_to_ref(self, *, value, key, scope:
TaskScope):
+ return f"s3://bucket/{scope.dag_id}/{scope.task_id}/{key}"
def deserialize_task_store_from_ref(self, stored):
return f"fetched:{stored}"
b = MyBackend()
- assert b.serialize_task_store_to_ref(value="app_1234", key="job_id",
ti_id="abc-123") == (
- "s3://bucket/abc-123/job_id"
+ scope = TaskScope(dag_id="my_dag", run_id="r", task_id="my_task",
map_index=-1)
+ assert b.serialize_task_store_to_ref(value="app_1234", key="job_id",
scope=scope) == (
+ "s3://bucket/my_dag/my_task/job_id"
)
assert (
- b.deserialize_task_store_from_ref("s3://bucket/abc-123/job_id")
- == "fetched:s3://bucket/abc-123/job_id"
+
b.deserialize_task_store_from_ref("s3://bucket/my_dag/my_task/job_id")
+ == "fetched:s3://bucket/my_dag/my_task/job_id"
)
def test_asset_store_serialize_deserialize_round_trip(self, backend):
original = "2026-05-01"
- serialized = backend.serialize_asset_store_to_ref(
- value="2026-05-01", key="watermark", asset_ref="my_asset"
- )
+ scope = AssetScope(name="my_asset")
+ serialized = backend.serialize_asset_store_to_ref(value="2026-05-01",
key="watermark", scope=scope)
deserialized = backend.deserialize_asset_store_from_ref(serialized)
assert deserialized == original
def test_asset_store_serialize_deserialize_typed_values(self, backend):
+ scope = AssetScope(name="my_asset")
assert (
backend.deserialize_asset_store_from_ref(
- backend.serialize_asset_store_to_ref(value=5,
key="total_runs", asset_ref="my_asset")
+ backend.serialize_asset_store_to_ref(value=5,
key="total_runs", scope=scope)
)
== 5
)
assert backend.deserialize_asset_store_from_ref(
- backend.serialize_asset_store_to_ref(value={"rows": 1234},
key="last_run", asset_ref="my_asset")
+ backend.serialize_asset_store_to_ref(value={"rows": 1234},
key="last_run", scope=scope)
) == {"rows": 1234}
def test_custom_backend_overrides_asset_store_ser_deser(self):
@@ -160,14 +163,15 @@ class TestBaseStoreBackend:
async def adelete(self, scope, key): ...
async def aclear(self, scope, *, all_map_indices=False): ...
- def serialize_asset_store_to_ref(self, *, value, key, asset_ref):
- return f"s3://bucket/assets/{asset_ref}/{key}"
+ def serialize_asset_store_to_ref(self, *, value, key, scope:
AssetScope):
+ return f"s3://bucket/assets/{scope.name}/{key}"
def deserialize_asset_store_from_ref(self, stored):
return f"resolved:{stored}"
b = MyBackend()
- assert b.serialize_asset_store_to_ref(value="2026-05-01",
key="watermark", asset_ref="my_asset") == (
+ scope = AssetScope(name="my_asset")
+ assert b.serialize_asset_store_to_ref(value="2026-05-01",
key="watermark", scope=scope) == (
"s3://bucket/assets/my_asset/watermark"
)
assert (
diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py
b/task-sdk/src/airflow/sdk/execution_time/context.py
index bbaff215c9b..aae3c0747ea 100644
--- a/task-sdk/src/airflow/sdk/execution_time/context.py
+++ b/task-sdk/src/airflow/sdk/execution_time/context.py
@@ -30,6 +30,7 @@ from uuid import UUID
import attrs
import structlog
+from airflow.sdk._shared.state import AssetScope
from airflow.sdk.configuration import conf
from airflow.sdk.definitions._internal.contextmanager import _CURRENT_CONTEXT
from airflow.sdk.definitions._internal.types import NOTSET
@@ -595,7 +596,7 @@ class TaskStoreAccessor:
backend = _get_worker_state_store_backend()
stored: JsonValue = value
if backend is not None:
- ref: str = backend.serialize_task_store_to_ref(value=value,
key=key, ti_id=str(self._ti_id))
+ ref: str = backend.serialize_task_store_to_ref(value=value,
key=key, scope=self._scope)
# wrap the value with a marker to indicate that it's stored
externally, and include the ref to the external storage
stored = _wrap_external_ref(ref)
@@ -717,10 +718,10 @@ class AssetStoreAccessor:
# if custom backend is configured, store the value on the custom
backend, and return the reference
# to the stored value to store in the DB
backend = _get_worker_state_store_backend()
- asset_ref = self._name or self._uri or ""
stored: JsonValue = value
if backend is not None:
- ref = backend.serialize_asset_store_to_ref(value=value, key=key,
asset_ref=asset_ref)
+ scope = AssetScope(name=self._name, uri=self._uri)
+ ref = backend.serialize_asset_store_to_ref(value=value, key=key,
scope=scope)
stored = _wrap_external_ref(ref)
msg: ToSupervisor
diff --git a/task-sdk/tests/task_sdk/execution_time/test_context.py
b/task-sdk/tests/task_sdk/execution_time/test_context.py
index 1e668fb9ffd..c1c7ad1626b 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_context.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_context.py
@@ -1619,8 +1619,8 @@ class InMemoryStoreBackend(BaseStoreBackend):
self._actual_key_value_store: dict[str, str] = {} # key -> actual
value
self.reference: dict[str, str] = {} # key -> stored ref (mem:// URI)
- def serialize_task_store_to_ref(self, *, value, key: str, ti_id: str) ->
str:
- ref = f"mem://{ti_id}/{key}"
+ def serialize_task_store_to_ref(self, *, value, key: str, scope) -> str:
+ ref =
f"mem://{scope.dag_id}/{scope.run_id}/{scope.task_id}/{scope.map_index}/{key}"
self._actual_key_value_store[key] = value
self.reference[key] = ref
return ref
@@ -1629,8 +1629,8 @@ class InMemoryStoreBackend(BaseStoreBackend):
key = stored.rsplit("/", 1)[-1]
return self._actual_key_value_store.get(key, stored)
- def serialize_asset_store_to_ref(self, *, value, key: str, asset_ref: str)
-> str:
- ref = f"mem://{asset_ref}/{key}"
+ def serialize_asset_store_to_ref(self, *, value, key: str, scope) -> str:
+ ref = f"mem://{scope.name or scope.uri}/{key}"
self._actual_key_value_store[key] = value
self.reference[key] = ref
return ref
@@ -1672,7 +1672,7 @@ class TestTaskStoreAccessorWithCustomBackend:
def test_set_returns_reference_to_storage(self, mock_supervisor_comms,
backend, time_machine):
"""set() stores actual value in backend and sends mem:// reference via
comms."""
mock_supervisor_comms.send.return_value = OKResponse(ok=True)
- expected_ref = f"mem://{self.TI_ID}/job_id"
+ expected_ref =
f"mem://{self.SCOPE.dag_id}/{self.SCOPE.run_id}/{self.SCOPE.task_id}/{self.SCOPE.map_index}/job_id"
frozen_dt = datetime(2026, 1, 1, 12, 0, 0, tzinfo=dt_timezone.utc)
time_machine.move_to(frozen_dt, tick=False)
@@ -1693,7 +1693,9 @@ class TestTaskStoreAccessorWithCustomBackend:
def test_get_resolves_reference_to_actual_value(self,
mock_supervisor_comms, backend):
"""get() fetches mem:// reference from DB, resolves it to actual value
via backend."""
- ref = _wrap_external_ref(f"mem://{self.TI_ID}/job_id")
+ ref = _wrap_external_ref(
+
f"mem://{self.SCOPE.dag_id}/{self.SCOPE.run_id}/{self.SCOPE.task_id}/{self.SCOPE.map_index}/job_id"
+ )
backend._actual_key_value_store["job_id"] = "app_001"
mock_supervisor_comms.send.return_value = TaskStoreResult(value=ref)
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index 1968de74973..5bf2434fba4 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -62,7 +62,7 @@ from airflow.sdk import (
timezone,
)
from airflow.sdk._shared.observability.metrics.base_stats_logger import
StatsLogger
-from airflow.sdk._shared.state import TaskScope
+from airflow.sdk._shared.state import AssetScope, TaskScope
from airflow.sdk.api.datamodels._generated import (
AssetProfile,
AssetResponse,
@@ -5817,7 +5817,7 @@ class TestTaskInstanceStateOperations:
run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
mock_backend.serialize_asset_store_to_ref.assert_called_once_with(
- value="2026-05-01", key="watermark", asset_ref="my_asset"
+ value="2026-05-01", key="watermark",
scope=AssetScope(name="my_asset", uri=None)
)
mock_supervisor_comms.send.assert_any_call(
SetAssetStoreByName(
@@ -5843,7 +5843,13 @@ class TestTaskInstanceStateOperations:
mock_supervisor_comms.send.side_effect =
TestTaskInstanceStateOperations._watcher_side_effect
mock_backend = mock.MagicMock()
- ref = f"mem://{runtime_ti.id}/job_id"
+ scope = TaskScope(
+ dag_id=runtime_ti.dag_id,
+ run_id=runtime_ti.run_id,
+ task_id=runtime_ti.task_id,
+ map_index=runtime_ti.map_index,
+ )
+ ref =
f"mem://{scope.dag_id}/{scope.run_id}/{scope.task_id}/{scope.map_index}/job_id"
mock_backend.serialize_task_store_to_ref.return_value = ref
with mock.patch(
@@ -5852,7 +5858,7 @@ class TestTaskInstanceStateOperations:
run(runtime_ti, context=runtime_ti.get_template_context(),
log=mock.MagicMock())
mock_backend.serialize_task_store_to_ref.assert_called_once_with(
- value="app_001", key="job_id", ti_id=str(runtime_ti.id)
+ value="app_001", key="job_id", scope=scope
)
mock_supervisor_comms.send.assert_any_call(
SetTaskStore(