amoghrajesh commented on code in PR #67530:
URL: https://github.com/apache/airflow/pull/67530#discussion_r3309009526
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -548,12 +547,12 @@ def set(self, key: str, value: JsonValue, *, retention:
timedelta | None = None)
# if custom backend is configured, store the value on the custom
backend, and return the reference
# to the stored value to store in the DB
+ stored: JsonValue = value
backend = _get_worker_state_backend()
- stored = (
- backend.serialize_task_state_to_ref(value=value, key=key,
ti_id=str(self._ti_id))
- if backend
- else value
- )
+ if backend is not None:
+ # decorate the value with a marker to indicate that it's stored
externally, and include the ref to the external storage
+ ref = backend.serialize_task_state_to_ref(value=value, key=key,
ti_id=str(self._ti_id))
+ stored = {"__type": "ExternalState", "__var": ref}
Review Comment:
Handled in
https://github.com/apache/airflow/commit/d101f308a37312809a2a4c9486b036fc8c11bb6f
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -648,11 +647,12 @@ def get(self, key: str) -> JsonValue:
if isinstance(resp, AssetStateResult):
stored = resp.value
backend = _get_worker_state_backend()
- if backend is not None:
- # serialize_asset_state_to_ref always returns str by contract;
stored contains the ref.
+ if backend is not None and isinstance(stored, dict) and
stored.get("__type") == "ExternalState":
+ # unwrap the marker to get the ref, and retrieve the actual
value from the backend using the ref
+ ref = stored["__var"]
if TYPE_CHECKING:
- assert isinstance(stored, str)
- return backend.deserialize_asset_state_from_ref(stored)
+ assert isinstance(ref, str)
+ return backend.deserialize_asset_state_from_ref(ref)
return stored
Review Comment:
Handled in
https://github.com/apache/airflow/commit/d101f308a37312809a2a4c9486b036fc8c11bb6f
##########
task-sdk/src/airflow/sdk/execution_time/context.py:
##########
@@ -661,15 +661,13 @@ def set(self, key: str, value: JsonValue) -> None:
from airflow.sdk.execution_time.comms import SetAssetStateByName,
SetAssetStateByUri, ToSupervisor
from airflow.sdk.execution_time.task_runner import SUPERVISOR_COMMS
- # if custom backend is configured, store the value on the custom
backend, and return the reference
- # to the stored value to store in the DB
backend = _get_worker_state_backend()
asset_ref = self._name or self._uri or ""
- stored = (
- backend.serialize_asset_state_to_ref(value=value, key=key,
asset_ref=asset_ref)
- if backend
- else value
- )
+ stored: JsonValue = value
+ if backend is not None:
+ # decorate the value with a marker to indicate that it's stored
externally, and include the ref to the external storage
+ ref = backend.serialize_asset_state_to_ref(value=value, key=key,
asset_ref=asset_ref)
+ stored = {"__type": "ExternalState", "__var": ref}
Review Comment:
Handled in
https://github.com/apache/airflow/commit/d101f308a37312809a2a4c9486b036fc8c11bb6f
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]