This is an automated email from the ASF dual-hosted git repository. ash pushed a commit to branch v2-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 0e0dc73c0b511598414f8d89e595efd09e5087ad Author: Kaxil Naik <[email protected]> AuthorDate: Fri Apr 16 03:39:26 2021 +0100 Bugfix: ``TypeError`` when Serializing & sorting iterables (#15395) This bug got introduced in #14909. Removed sorting from list and tuple as list & tuples preserve order unlike set. The following DAG errors with: `TypeError: '<' not supported between instances of 'dict' and 'dict'` ```python from airflow import models from airflow.operators.dummy import DummyOperator from datetime import datetime, timedelta params = { "staging_schema": [{"key:":"foo","value":"bar"}, {"key:":"this","value":"that"}] } with models.DAG(dag_id='test-dag', start_date=datetime(2019, 2, 14), schedule_interval='30 13 * * *', catchup=False, max_active_runs=1, params=params ) as dag: my_task = DummyOperator( task_id='task1' ) ``` Full Error: ``` File "/usr/local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 210, in <dictcomp> return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT) File "/usr/local/lib/python3.7/site-packages/airflow/serialization/serialized_objects.py", line 212, in _serialize return sorted(cls._serialize(v) for v in var) TypeError: '<' not supported between instances of 'dict' and 'dict' During handling of the above exception, another exception occurred: ... ``` This is because `sorted()` does not work with dict as it can't compare. Removed sorting from list & tuples which fixes it. It also fails when we have set with multiple types. (cherry picked from commit d1150403a35c497a774a4ffbb1ca4546c532dc81) --- airflow/serialization/serialized_objects.py | 9 ++++--- tests/serialization/test_dag_serialization.py | 36 +++++++++++++++++++++------ 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index a890cd1..b6cfdf2 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -214,7 +214,7 @@ class BaseSerialization: elif isinstance(var, dict): return cls._encode({str(k): cls._serialize(v) for k, v in var.items()}, type_=DAT.DICT) elif isinstance(var, list): - return sorted(cls._serialize(v) for v in var) + return [cls._serialize(v) for v in var] elif HAS_KUBERNETES and isinstance(var, k8s.V1Pod): json_pod = PodGenerator.serialize_pod(var) return cls._encode(json_pod, type_=DAT.POD) @@ -240,10 +240,13 @@ class BaseSerialization: return str(get_python_source(var)) elif isinstance(var, set): # FIXME: casts set to list in customized serialization in future. - return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET) + try: + return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.SET) + except TypeError: + return cls._encode([cls._serialize(v) for v in var], type_=DAT.SET) elif isinstance(var, tuple): # FIXME: casts tuple to list in customized serialization in future. - return cls._encode(sorted(cls._serialize(v) for v in var), type_=DAT.TUPLE) + return cls._encode([cls._serialize(v) for v in var], type_=DAT.TUPLE) elif isinstance(var, TaskGroup): return SerializedTaskGroup.serialize_task_group(var) else: diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index e447751..895f2cf 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -357,10 +357,9 @@ class TestStringifiedDAGs(unittest.TestCase): "_task_group", } for field in fields_to_check: - dag_field = getattr(dag, field) - if isinstance(dag_field, list): - dag_field = sorted(dag_field) - assert getattr(serialized_dag, field) == dag_field, f'{dag.dag_id}.{field} does not match' + assert getattr(serialized_dag, field) == getattr( + dag, field + ), f'{dag.dag_id}.{field} does not match' if dag.default_args: for k, v in dag.default_args.items(): @@ -1041,7 +1040,7 @@ class TestStringifiedDAGs(unittest.TestCase): [ ( ['task_1', 'task_5', 'task_2', 'task_4'], - ['task_1', 'task_2', 'task_4', 'task_5'], + ['task_1', 'task_5', 'task_2', 'task_4'], ), ( {'task_1', 'task_5', 'task_2', 'task_4'}, @@ -1049,16 +1048,39 @@ class TestStringifiedDAGs(unittest.TestCase): ), ( ('task_1', 'task_5', 'task_2', 'task_4'), - ['task_1', 'task_2', 'task_4', 'task_5'], + ['task_1', 'task_5', 'task_2', 'task_4'], + ), + ( + { + "staging_schema": [ + {"key:": "foo", "value": "bar"}, + {"key:": "this", "value": "that"}, + "test_conf", + ] + }, + { + "staging_schema": [ + {"__type": "dict", "__var": {"key:": "foo", "value": "bar"}}, + { + "__type": "dict", + "__var": {"key:": "this", "value": "that"}, + }, + "test_conf", + ] + }, ), ( {"task3": "test3", "task2": "test2", "task1": "test1"}, {"task1": "test1", "task2": "test2", "task3": "test3"}, ), + ( + ('task_1', 'task_5', 'task_2', 3, ["x", "y"]), + ['task_1', 'task_5', 'task_2', 3, ["x", "y"]], + ), ] ) def test_serialized_objects_are_sorted(self, object_to_serialized, expected_output): - """Test Serialized Lists, Sets and Tuples are sorted""" + """Test Serialized Sets are sorted while list and tuple preserve order""" serialized_obj = SerializedDAG._serialize(object_to_serialized) if isinstance(serialized_obj, dict) and "__type" in serialized_obj: serialized_obj = serialized_obj["__var"]
