This is an automated email from the ASF dual-hosted git repository.
bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new bd32467ede Decode old-style nested Xcom value (#31866)
bd32467ede is described below
commit bd32467ede1a5a197e09456803f7cebaee9f9b77
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Fri Jun 30 02:07:16 2023 +0530
Decode old-style nested Xcom value (#31866)
The deserializer was not properly dealing with nested and wrapped old-style
xcom values.
---------
Co-authored-by: bolkedebruin <[email protected]>
---
airflow/serialization/serde.py | 7 ++++++-
tests/serialization/test_serde.py | 19 ++++++++++++++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git a/airflow/serialization/serde.py b/airflow/serialization/serde.py
index b7e9443441..d996cb9b16 100644
--- a/airflow/serialization/serde.py
+++ b/airflow/serialization/serde.py
@@ -47,6 +47,7 @@ CACHE = "__cache__"
OLD_TYPE = "__type"
OLD_SOURCE = "__source"
OLD_DATA = "__var"
+OLD_DICT = "dict"
DEFAULT_VERSION = 0
@@ -275,7 +276,11 @@ def deserialize(o: T | None, full=True, type_hint: Any =
None) -> object:
def _convert(old: dict) -> dict:
"""Converts an old style serialization to new style."""
if OLD_TYPE in old and OLD_DATA in old:
- return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA:
old[OLD_DATA][OLD_DATA]}
+ # Return old style dicts directly as they do not need wrapping
+ if old[OLD_TYPE] == OLD_DICT:
+ return old[OLD_DATA]
+ else:
+ return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA:
old[OLD_DATA]}
return old
diff --git a/tests/serialization/test_serde.py
b/tests/serialization/test_serde.py
index 1b2a426ac1..23b4a2ea03 100644
--- a/tests/serialization/test_serde.py
+++ b/tests/serialization/test_serde.py
@@ -257,6 +257,9 @@ class TestSerDe:
deserialize(data)
def test_backwards_compat(self):
+ """
+ Verify deserialization of old-style encoded Xcom values including
nested ones
+ """
uri = "s3://does_not_exist"
data = {
"__type": "airflow.datasets.Dataset",
@@ -264,14 +267,28 @@ class TestSerDe:
"__var": {
"__var": {
"uri": uri,
- "extra": None,
+ "extra": {
+ "__var": {"hi": "bye"},
+ "__type": "dict",
+ },
},
"__type": "dict",
},
}
dataset = deserialize(data)
+ assert dataset.extra == {"hi": "bye"}
assert dataset.uri == uri
+ def test_backwards_compat_wrapped(self):
+ """
+ Verify deserialization of old-style wrapped XCom value
+ """
+ i = {
+ "extra": {"__var": {"hi": "bye"}, "__type": "dict"},
+ }
+ e = deserialize(i)
+ assert e["extra"] == {"hi": "bye"}
+
def test_encode_dataset(self):
dataset = Dataset("mytest://dataset")
obj = deserialize(serialize(dataset))