amoghrajesh commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3264729983


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -495,13 +515,26 @@ def set(self, key: str, value: str, *, retention: 
timedelta | None = None) -> No
         else:
             days = conf.getint("state_store", "default_retention_days")
             expires_at = None if days <= 0 else now + timedelta(days=days)
-        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=value, expires_at=expires_at))
+
+        # if custom backend is configured, store the value on the custom 
backend, and return the reference
+        # to the stored value to store in the DB
+        backend = _get_worker_state_backend()
+        stored = (
+            backend.serialize_task_state_value(value=value, key=key, 
ti_id=str(self._ti_id))
+            if backend
+            else value
+        )
+
+        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=stored, expires_at=expires_at))
 
     def delete(self, key: str) -> None:
         """Delete a single key. No-op if the key does not exist."""
         from airflow.sdk.execution_time.comms import DeleteTaskState
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        backend = _get_worker_state_backend()
+        if backend is not None:

Review Comment:
   Since `_get_worker_state_backend()` is cached, it's already computed once 
globally which is functionally the same as a property. 
   
   Keeping the current approach to avoid coupling the backend lifecycle to 
individual accessor instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to