amoghrajesh commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3271802361
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -495,13 +519,26 @@ def set(self, key: str, value: str, *, retention:
timedelta | None = None) -> No
else:
days = conf.getint("state_store", "default_retention_days")
expires_at = None if days <= 0 else now + timedelta(days=days)
- SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key,
value=value, expires_at=expires_at))
+
+ # if custom backend is configured, store the value on the custom
backend, and return the reference
+ # to the stored value to store in the DB
+ backend = _get_worker_state_backend()
+ stored = (
+ backend.serialize_task_state_to_ref(value=value, key=key,
ti_id=str(self._ti_id))
+ if backend
+ else value
+ )
+
+ SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key,
value=stored, expires_at=expires_at))
def delete(self, key: str) -> None:
"""Delete a single key. No-op if the key does not exist."""
from airflow.sdk.execution_time.comms import DeleteTaskState
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+ backend = _get_worker_state_backend()
+ if backend is not None:
Review Comment:
Handled in 59e33f7b39
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -598,9 +659,21 @@ def delete(self, key: str) -> None:
def clear(self) -> None:
"""Delete all state keys for this asset."""
- from airflow.sdk.execution_time.comms import ClearAssetStateByName,
ClearAssetStateByUri, ToSupervisor
+ from airflow.sdk._shared.state import AssetScope
+ from airflow.sdk.execution_time.comms import (
+ ClearAssetStateByName,
+ ClearAssetStateByUri,
+ ToSupervisor,
+ )
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+ backend = _get_worker_state_backend()
+ # custom backends handle external storage cleanup only;
Review Comment:
Handled in 59e33f7b39
##########
shared/state/src/airflow_shared/state/__init__.py:
##########
@@ -47,9 +47,21 @@ class TaskScope:
@dataclass(frozen=True)
class AssetScope:
- """Identifies the state namespace for an asset."""
+ """
+ Identifies the state namespace for an asset.
+
+ Server-side backends receive ``asset_id``. Worker-side backends receive
``name`` or ``uri``
+ since workers do not have access to the integer ``asset_id``.
+
+ Note: ``name`` and ``uri`` are not guaranteed to be unique over time — if
an asset is
+ deactivated and a new one created with the same name, both share the same
``name`` value.
+ State for inactive assets is cleaned up by the orphan GC pass; until then,
stale rows exist
+ in the DB but cannot be written to (the Execution API resolver filters to
active assets only).
+ """
- asset_id: int
+ asset_id: int | None = None
Review Comment:
Handled in 59e33f7b39
##########
airflow-core/src/airflow/config_templates/config.yml:
##########
@@ -1889,6 +1889,16 @@ workers:
sensitive: true
example: ~
default: ""
+ state_backend:
+ description: |
+ Full class name of the state backend to use on workers for direct task
state access,
Review Comment:
Handled in 59e33f7b39
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]