amoghrajesh commented on code in PR #67530:
URL: https://github.com/apache/airflow/pull/67530#discussion_r3308826117


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -512,14 +512,13 @@ def get(self, key: str) -> JsonValue:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, TaskStateResult):
             stored = resp.value
-            # if custom backend is configured, the stored value in DB is a 
reference, fetch the actual value from
-            # custom backend using the reference
             backend = _get_worker_state_backend()
-            if backend is not None:
-                # serialize_task_state_to_ref always returns str by contract; 
stored contains the ref.
+            if backend is not None and isinstance(stored, dict) and 
stored.get("__type") == "ExternalState":
+                # unwrap the marker to get the ref, and retrieve the actual 
value from the backend using the ref
+                ref = stored["__var"]

Review Comment:
   This is now resolved by the envelope change from previous comment. The read 
path checks `_EXTERNAL_STATE_REF_KEY in stored` before accessing the value, so 
a malformed envelope can't produce a bare KeyError. And since the new shape is 
a single key (`{"__airflow_state_ref__": ref}`), there is no separate field 
that could be missing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to