jason810496 commented on code in PR #66859:
URL: https://github.com/apache/airflow/pull/66859#discussion_r3264258642


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -495,13 +515,26 @@ def set(self, key: str, value: str, *, retention: 
timedelta | None = None) -> No
         else:
             days = conf.getint("state_store", "default_retention_days")
             expires_at = None if days <= 0 else now + timedelta(days=days)
-        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=value, expires_at=expires_at))
+
+        # if custom backend is configured, store the value on the custom 
backend, and return the reference
+        # to the stored value to store in the DB
+        backend = _get_worker_state_backend()
+        stored = (
+            backend.serialize_task_state_value(value=value, key=key, 
ti_id=str(self._ti_id))
+            if backend
+            else value
+        )
+
+        SUPERVISOR_COMMS.send(SetTaskState(ti_id=self._ti_id, key=key, 
value=stored, expires_at=expires_at))
 
     def delete(self, key: str) -> None:
         """Delete a single key. No-op if the key does not exist."""
         from airflow.sdk.execution_time.comms import DeleteTaskState
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
+        backend = _get_worker_state_backend()
+        if backend is not None:

Review Comment:
   How about having a property for the worker backend?
   
   
   ```suggestion
           if self._worker_backend:
               self._worker_backend.delete(self._scope, key)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to