jason810496 commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3264258642
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -495,13 +515,26 @@ def set(self, key: str, value: str, *, retention:
timedelta | None = None) -> No
else:
days = conf.getint("state_store", "default_retention_days")
expires_at = None if days <= 0 else now + timedelta(days=days)
- SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key,
value=value, expires_at=expires_at))
+
+ # if custom backend is configured, store the value on the custom
backend, and return the reference
+ # to the stored value to store in the DB
+ backend = _get_worker_state_backend()
+ stored = (
+ backend.serialize_task_state_value(value=value, key=key,
ti_id=str(self._ti_id))
+ if backend
+ else value
+ )
+
+ SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key,
value=stored, expires_at=expires_at))
def delete(self, key: str) -> None:
"""Delete a single key. No-op if the key does not exist."""
from airflow.sdk.execution_time.comms import DeleteTaskState
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
+ backend = _get_worker_state_backend()
+ if backend is not None:
Review Comment:
How about having a property for the worker backend?
```suggestion
if self._worker_backend:
self._worker_backend.delete(self._scope, key)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]