kaxil commented on code in PR #67530:
URL: https://github.com/apache/airflow/pull/67530#discussion_r3307394111


##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -512,14 +512,13 @@ def get(self, key: str) -> JsonValue:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, TaskStateResult):
             stored = resp.value
-            # if custom backend is configured, the stored value in DB is a 
reference, fetch the actual value from
-            # custom backend using the reference
             backend = _get_worker_state_backend()
-            if backend is not None:
-                # serialize_task_state_to_ref always returns str by contract; 
stored contains the ref.
+            if backend is not None and isinstance(stored, dict) and 
stored.get("__type") == "ExternalState":
+                # unwrap the marker to get the ref, and retrieve the actual 
value from the backend using the ref
+                ref = stored["__var"]
                 if TYPE_CHECKING:
-                    assert isinstance(stored, str)
-                return backend.deserialize_task_state_from_ref(stored)
+                    assert isinstance(ref, str)
+                return backend.deserialize_task_state_from_ref(ref)
             return stored

Review Comment:
   When `_get_worker_state_backend()` returns a backend but `stored` is not an 
`ExternalState` envelope (`not isinstance(stored, dict)`, missing `__type`, or 
`__type != "ExternalState"`), this falls through to `return stored` on line 522 
-- the raw value is returned to the user as if no backend were configured. This 
matters during rolling upgrades / mixed-version workers: a row written by an 
older worker (pre-#67530) stored the raw ref string directly (no envelope). 
After upgrade, the new worker reads it back, fails the `isinstance(stored, 
dict)` check, and returns the *ref string itself* to user code rather than 
dereferencing it through the backend. The pre-#67530 code path explicitly 
called `backend.deserialize_task_state_from_ref(stored)` in that case.
   
   Can you confirm the intended behavior here? If pre-envelope rows aren't 
expected to exist (fresh schema with no migration), worth a comment or a log 
warning so the silent fallthrough doesn't mask a future bug. If pre-envelope 
rows *can* exist (rolling deploy), this needs a fallback to the old 
`deserialize_task_state_from_ref(stored)` path for `isinstance(stored, str)`.
   
   Same issue at line 656 for asset state.



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -512,14 +512,13 @@ def get(self, key: str) -> JsonValue:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, TaskStateResult):
             stored = resp.value
-            # if custom backend is configured, the stored value in DB is a 
reference, fetch the actual value from
-            # custom backend using the reference
             backend = _get_worker_state_backend()
-            if backend is not None:
-                # serialize_task_state_to_ref always returns str by contract; 
stored contains the ref.
+            if backend is not None and isinstance(stored, dict) and 
stored.get("__type") == "ExternalState":

Review Comment:
   The envelope shape `{"__type", "__var"}` aliases the wire format that 
`airflow-core/src/airflow/serialization/serialized_objects.py` uses for 
serializing complex objects (`OLD_TYPE = "__type"`, `OLD_DATA = "__var"`). 
Reusing those keys here creates two classes of collisions worth thinking 
through.
   
   First, user-supplied dicts with the same shape: a user who does 
`task_state.set("k", {"__type": "ExternalState", "__var": "some-string"})` 
will, on read, have their dict misinterpreted as a backend ref and passed to 
`backend.deserialize_task_state_from_ref("some-string")`. There's no 
`FORBIDDEN_XCOM_KEYS`-equivalent guard on `task_state.set()` to block this. 
Even without a backend the value still appears legitimate, so the trap is 
silent until the deployment turns a backend on.
   
   Second, cross-mixing with `BaseSerialization`: if anything downstream (UI 
renderer, audit log, copying state to XCom) ever calls 
`BaseSerialization.deserialize` on this dict, it will try to resolve 
`"ExternalState"` as a registered class.
   
   The PR title mentions "for UI clarity" -- and the UI is the *consumer* of 
this envelope. So the question is: what's the rationale for adopting 
`__type`/`__var` (Airflow's internal serialization envelope) here, vs. a 
distinct namespace like `{"__airflow_state_ref__": "..."}` that can't collide 
with either user values or `BaseSerialization`? If it's intentional alignment 
with the UI's existing renderer, worth a comment pointing to that. If it's 
incidental, the namespace collision is worth resolving now while there's no 
on-disk data.



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -512,14 +512,13 @@ def get(self, key: str) -> JsonValue:
             raise AirflowRuntimeError(resp)
         if isinstance(resp, TaskStateResult):
             stored = resp.value
-            # if custom backend is configured, the stored value in DB is a 
reference, fetch the actual value from
-            # custom backend using the reference
             backend = _get_worker_state_backend()
-            if backend is not None:
-                # serialize_task_state_to_ref always returns str by contract; 
stored contains the ref.
+            if backend is not None and isinstance(stored, dict) and 
stored.get("__type") == "ExternalState":
+                # unwrap the marker to get the ref, and retrieve the actual 
value from the backend using the ref
+                ref = stored["__var"]

Review Comment:
   `stored["__var"]` raises a bare `KeyError` if a malformed envelope (e.g. 
`{"__type": "ExternalState"}` with no `__var`) ever lands in the DB -- corrupt 
row, partial write, schema drift on a future backend. Bare `KeyError` 
propagating out of `TaskStateAccessor.get()` will be hard to diagnose from the 
user's traceback.
   
   Minor: `ref = stored.get("__var")` plus an explicit `if not isinstance(ref, 
str): raise AirflowRuntimeError(...)` (or similar) gives a typed, identifiable 
error. Same on line 652 for the asset-state path.



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -548,12 +547,12 @@ def set(self, key: str, value: JsonValue, *, retention: 
timedelta | None = None)
 
         # if custom backend is configured, store the value on the custom 
backend, and return the reference
         # to the stored value to store in the DB
+        stored: JsonValue = value
         backend = _get_worker_state_backend()
-        stored = (
-            backend.serialize_task_state_to_ref(value=value, key=key, 
ti_id=str(self._ti_id))
-            if backend
-            else value
-        )
+        if backend is not None:
+            # decorate the value with a marker to indicate that it's stored 
externally, and include the ref to the external storage
+            ref = backend.serialize_task_state_to_ref(value=value, key=key, 
ti_id=str(self._ti_id))
+            stored = {"__type": "ExternalState", "__var": ref}

Review Comment:
   The string literals `"__type"`, `"__var"`, and `"ExternalState"` appear four 
times across this file (516/518, 555, 650/652, 670) and form the wire contract 
between the worker SDK and any downstream consumer (UI, future server-side 
reader). Hoisting them to module-level constants -- e.g. `_EXTERNAL_STATE_TYPE 
= "ExternalState"`, `_EXTERNAL_STATE_TYPE_KEY = "__type"`, 
`_EXTERNAL_STATE_REF_KEY = "__var"` -- and writing a small helper like 
`_wrap_external_ref(ref) -> dict` / `_unwrap_external_ref(stored) -> str | 
None` will prevent a single typo (`"__var"` -> `"_var"`, `"ExternalState"` -> 
`"ExternalSate"`) from silently breaking the round-trip in one direction, make 
the contract visible to the UI / server-side renderer that consumes this 
format, and be the natural place to add the malformed-envelope guard from the 
comment on line 518.



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -661,15 +661,13 @@ def set(self, key: str, value: JsonValue) -> None:
         from airflow.sdk.execution_time.comms import SetAssetStateByName, 
SetAssetStateByUri, ToSupervisor
         from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
 
-        # if custom backend is configured, store the value on the custom 
backend, and return the reference
-        # to the stored value to store in the DB
         backend = _get_worker_state_backend()
         asset_ref = self._name or self._uri or ""
-        stored = (
-            backend.serialize_asset_state_to_ref(value=value, key=key, 
asset_ref=asset_ref)
-            if backend
-            else value
-        )
+        stored: JsonValue = value
+        if backend is not None:
+            # decorate the value with a marker to indicate that it's stored 
externally, and include the ref to the external storage
+            ref = backend.serialize_asset_state_to_ref(value=value, key=key, 
asset_ref=asset_ref)
+            stored = {"__type": "ExternalState", "__var": ref}

Review Comment:
   The envelope is now a wire-protocol contract between the SDK and any 
consumer of `task_state`/`asset_state` values (the UI in this PR, but also any 
future server-side reader or third-party state backend implementer). Right now 
that contract is encoded only as four duplicated dict literals across this one 
file.
   
   Worth documenting in `BaseStateBackend` 
(shared/state/src/airflow_shared/state/__init__.py) -- a brief note in the 
class docstring or on `serialize_task_state_to_ref` / 
`serialize_asset_state_to_ref` -- that the returned ref string is *wrapped by 
the worker* into `{"__type": "ExternalState", "__var": <ref>}` before 
persistence, so implementers don't try to wrap it themselves. And in the UI 
consumer / renderer: a comment pointing back to this envelope shape so renderer 
changes here don't break the UI silently.
   
   (The PR description says "for UI clarity" but doesn't link the UI consumer. 
Could you add a link to the UI PR/issue in the description so the rollout 
sequence is reviewable? Otherwise it's hard to verify the envelope shape 
actually matches what the renderer expects.)



##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -648,11 +647,12 @@ def get(self, key: str) -> JsonValue:
         if isinstance(resp, AssetStateResult):
             stored = resp.value
             backend = _get_worker_state_backend()
-            if backend is not None:
-                # serialize_asset_state_to_ref always returns str by contract; 
stored contains the ref.
+            if backend is not None and isinstance(stored, dict) and 
stored.get("__type") == "ExternalState":
+                # unwrap the marker to get the ref, and retrieve the actual 
value from the backend using the ref
+                ref = stored["__var"]
                 if TYPE_CHECKING:
-                    assert isinstance(stored, str)
-                return backend.deserialize_asset_state_from_ref(stored)
+                    assert isinstance(ref, str)
+                return backend.deserialize_asset_state_from_ref(ref)
             return stored

Review Comment:
   Same silent fallthrough as the task-state read at line 522 -- if a backend 
is configured but `stored` isn't an `ExternalState` envelope, the raw stored 
value is returned to user code without going through 
`backend.deserialize_asset_state_from_ref()`.
   
   Same question applies: are pre-envelope rows expected to exist (rolling 
upgrade), or is this a fresh schema? Either a backwards-compat branch for 
`isinstance(stored, str)` or a log/warning that this code path was hit is worth 
adding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to