This is an automated email from the ASF dual-hosted git repository.

bolke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new bd32467ede Decode old-style nested Xcom value (#31866)
bd32467ede is described below

commit bd32467ede1a5a197e09456803f7cebaee9f9b77
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Fri Jun 30 02:07:16 2023 +0530

    Decode old-style nested Xcom value (#31866)
    
    The deserializer was not properly dealing with nested and wrapped old-style 
xcom values.
    ---------
    
    Co-authored-by: bolkedebruin <[email protected]>
---
 airflow/serialization/serde.py    |  7 ++++++-
 tests/serialization/test_serde.py | 19 ++++++++++++++++++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git a/airflow/serialization/serde.py b/airflow/serialization/serde.py
index b7e9443441..d996cb9b16 100644
--- a/airflow/serialization/serde.py
+++ b/airflow/serialization/serde.py
@@ -47,6 +47,7 @@ CACHE = "__cache__"
 OLD_TYPE = "__type"
 OLD_SOURCE = "__source"
 OLD_DATA = "__var"
+OLD_DICT = "dict"
 
 DEFAULT_VERSION = 0
 
@@ -275,7 +276,11 @@ def deserialize(o: T | None, full=True, type_hint: Any = 
None) -> object:
 def _convert(old: dict) -> dict:
     """Converts an old style serialization to new style."""
     if OLD_TYPE in old and OLD_DATA in old:
-        return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: 
old[OLD_DATA][OLD_DATA]}
+        # Return old style dicts directly as they do not need wrapping
+        if old[OLD_TYPE] == OLD_DICT:
+            return old[OLD_DATA]
+        else:
+            return {CLASSNAME: old[OLD_TYPE], VERSION: DEFAULT_VERSION, DATA: 
old[OLD_DATA]}
 
     return old
 
diff --git a/tests/serialization/test_serde.py 
b/tests/serialization/test_serde.py
index 1b2a426ac1..23b4a2ea03 100644
--- a/tests/serialization/test_serde.py
+++ b/tests/serialization/test_serde.py
@@ -257,6 +257,9 @@ class TestSerDe:
             deserialize(data)
 
     def test_backwards_compat(self):
+        """
+        Verify deserialization of old-style encoded Xcom values including 
nested ones
+        """
         uri = "s3://does_not_exist"
         data = {
             "__type": "airflow.datasets.Dataset",
@@ -264,14 +267,28 @@ class TestSerDe:
             "__var": {
                 "__var": {
                     "uri": uri,
-                    "extra": None,
+                    "extra": {
+                        "__var": {"hi": "bye"},
+                        "__type": "dict",
+                    },
                 },
                 "__type": "dict",
             },
         }
         dataset = deserialize(data)
+        assert dataset.extra == {"hi": "bye"}
         assert dataset.uri == uri
 
+    def test_backwards_compat_wrapped(self):
+        """
+        Verify deserialization of old-style wrapped XCom value
+        """
+        i = {
+            "extra": {"__var": {"hi": "bye"}, "__type": "dict"},
+        }
+        e = deserialize(i)
+        assert e["extra"] == {"hi": "bye"}
+
     def test_encode_dataset(self):
         dataset = Dataset("mytest://dataset")
         obj = deserialize(serialize(dataset))

Reply via email to