Lee-W commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3263882082
##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -166,3 +173,41 @@ def cleanup(self) -> None:
retention policy. The backend is responsible for reading any relevant
config (e.g.
``[state_store] default_retention_days``) and deciding what to delete.
"""
+
+ def serialize_task_state_value(self, *, value: str, key: str, ti_id: str)
-> str:
+ """
+ Serialize a task state value before it is sent to the execution API
for db persistence.
+
+ Called by ``TaskStateAccessor.set()`` on the worker. The return value
is what gets
+ stored in the DB — typically a reference path (e.g. an S3 key) rather
than the
+ actual value. Default: return ``value`` unchanged.
+ """
+ return value
+
+ def deserialize_task_state_value(self, stored: str) -> str:
+ """
+ Resolve a stored task state string back to the actual value.
+
+ Called by ``TaskStateAccessor.get()`` after the stored string is
retrieved from
+ the execution API. Default: return ``stored`` unchanged.
+ """
+ return stored
+
+ def serialize_asset_state_value(self, *, value: str, key: str, asset_name:
str) -> str:
Review Comment:
without asset_url, are we still allowing `Asset.ref(uri=)` here? Or do we
always get the asset model and get the name, then pass it here? What happens if
there's an inactive asset?
e.g.,
```python
Asset(name="abc", uri="abc") # inactive
Asset(name="abc", uri="abcde") # active
```
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -434,11 +435,25 @@ def get(self, key, default: Any = NOTSET) -> Any:
raise
+@cache
+def _get_worker_state_backend():
Review Comment:
```suggestion
def _get_worker_state_backend() -> BaseStateBackend:
```
##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -166,3 +173,41 @@ def cleanup(self) -> None:
retention policy. The backend is responsible for reading any relevant
config (e.g.
``[state_store] default_retention_days``) and deciding what to delete.
"""
+
+ def serialize_task_state_value(self, *, value: str, key: str, ti_id: str)
-> str:
+ """
+ Serialize a task state value before it is sent to the execution API
for db persistence.
+
+ Called by ``TaskStateAccessor.set()`` on the worker. The return value
is what gets
+ stored in the DB — typically a reference path (e.g. an S3 key) rather
than the
+ actual value. Default: return ``value`` unchanged.
+ """
+ return value
+
+ def deserialize_task_state_value(self, stored: str) -> str:
+ """
+ Resolve a stored task state string back to the actual value.
+
+ Called by ``TaskStateAccessor.get()`` after the stored string is
retrieved from
+ the execution API. Default: return ``stored`` unchanged.
+ """
+ return stored
+
+ def serialize_asset_state_value(self, *, value: str, key: str, asset_name:
str) -> str:
Review Comment:
I do remember we have some cleanup mechanism in the previous PR. but what
would happen if it's not cleanup in time
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]