This is an automated email from the ASF dual-hosted git repository. utkarsharma pushed a commit to branch v2-9-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 92777adbb367d549c654d6ae9856d0f19d671a81 Author: Usiel Riedl <[email protected]> AuthorDate: Thu Jun 13 11:11:16 2024 +0800 Ensures DAG params order regardless of backend (#40156) * Ensures DAG params order regardless of backend Fixes https://github.com/apache/airflow/issues/40154 This change adds an extra attribute to the serialized DAG param objects which helps us decide the order of the deserialized params dictionary later even if the backend messes with us. I decided not to limit this just to MySQL since the operation is inexpensive and may turn out to be helpful. I made sure the new test fails with the old implementation + MySQL. I assume this test will be executed with MySQL somewhere in the build actions? * Removes GitHub reference Co-authored-by: Jed Cunningham <[email protected]> * Serialize DAG params as array of tuples to ensure ordering Alternative to previous approach: We serialize the DAG params dict as a list of tuples which _should_ keep their ordering regardless of backend. Backwards compatibility is ensured because if `encoded_params` is a `dict` (not the expected `list`) then `dict(encoded_params)` still works. * Make backwards compatibility more explicit Based on suggestions by @uranusjr with an additional fix to make mypy happy. --------- Co-authored-by: Jed Cunningham <[email protected]> (cherry picked from commit 2149b4dbee8fb524bb235280aaef158afaec8d4a) --- airflow/serialization/schema.json | 14 +++++---- airflow/serialization/serialized_objects.py | 18 ++++++++---- tests/models/test_serialized_dag.py | 16 ++++++++++ tests/serialization/test_dag_serialization.py | 42 ++++++++++++++++++++------- 4 files changed, 69 insertions(+), 21 deletions(-) diff --git a/airflow/serialization/schema.json b/airflow/serialization/schema.json index a2a6732763..3df4b533d8 100644 --- a/airflow/serialization/schema.json +++ b/airflow/serialization/schema.json @@ -136,7 +136,7 @@ "dag": { "type": "object", "properties": { - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "_dag_id": { "type": "string" }, "tasks": { "$ref": "#/definitions/tasks" }, "timezone": { "$ref": "#/definitions/timezone" }, @@ -206,9 +206,13 @@ "type": "array", "additionalProperties": { "$ref": "#/definitions/operator" } }, - "params_dict": { - "type": "object", - "additionalProperties": {"$ref": "#/definitions/param" } + "params": { + "type": "array", + "prefixItems": [ + { "type": "string" }, + { "$ref": "#/definitions/param" } + ], + "unevaluatedItems": false }, "param": { "$comment": "A param for a dag / operator", @@ -258,7 +262,7 @@ "retry_delay": { "$ref": "#/definitions/timedelta" }, "retry_exponential_backoff": { "type": "boolean" }, "max_retry_delay": { "$ref": "#/definitions/timedelta" }, - "params": { "$ref": "#/definitions/params_dict" }, + "params": { "$ref": "#/definitions/params" }, "priority_weight": { "type": "number" }, "weight_rule": { "type": "string" }, "executor_config": { "$ref": "#/definitions/dict" }, diff --git a/airflow/serialization/serialized_objects.py b/airflow/serialization/serialized_objects.py index 16a5c9e481..fdc0523488 100644 --- a/airflow/serialization/serialized_objects.py +++ b/airflow/serialization/serialized_objects.py @@ -777,9 +777,9 @@ class BaseSerialization: return class_(**kwargs) @classmethod - def _serialize_params_dict(cls, params: ParamsDict | dict): - """Serialize Params dict for a DAG or task.""" - serialized_params = {} + def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]: + """Serialize Params dict for a DAG or task as a list of tuples to ensure ordering.""" + serialized_params = [] for k, v in params.items(): # TODO: As of now, we would allow serialization of params which are of type Param only. try: @@ -787,7 +787,7 @@ class BaseSerialization: except AttributeError: class_identity = "" if class_identity == "airflow.models.param.Param": - serialized_params[k] = cls._serialize_param(v) + serialized_params.append((k, cls._serialize_param(v))) else: raise ValueError( f"Params to a DAG or a Task can be only of type airflow.models.param.Param, " @@ -796,10 +796,16 @@ class BaseSerialization: return serialized_params @classmethod - def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict: + def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) -> ParamsDict: """Deserialize a DAG's Params dict.""" + if isinstance(encoded_params, collections.abc.Mapping): + # in 2.9.2 or earlier params were serialized as JSON objects + encoded_param_pairs: Iterable[tuple[str, dict]] = encoded_params.items() + else: + encoded_param_pairs = encoded_params + op_params = {} - for k, v in encoded_params.items(): + for k, v in encoded_param_pairs: if isinstance(v, dict) and "__class" in v: op_params[k] = cls._deserialize_param(v) else: diff --git a/tests/models/test_serialized_dag.py b/tests/models/test_serialized_dag.py index 627943c6fd..cb62976e55 100644 --- a/tests/models/test_serialized_dag.py +++ b/tests/models/test_serialized_dag.py @@ -206,6 +206,22 @@ class TestSerializedDagModel: expected_dependencies = {dag_id: [] for dag_id in example_dags} assert SDM.get_dag_dependencies() == expected_dependencies + def test_order_of_dag_params_is_stable(self): + """ + This asserts that we have logic in place which guarantees the order + of the params is maintained - even if the backend (e.g. MySQL) mutates + the serialized DAG JSON. + """ + example_dags = make_example_dags(example_dags_module) + example_params_trigger_ui = example_dags.get("example_params_trigger_ui") + before = list(example_params_trigger_ui.params.keys()) + + SDM.write_dag(example_params_trigger_ui) + retrieved_dag = SDM.get_dag("example_params_trigger_ui") + after = list(retrieved_dag.params.keys()) + + assert before == after + def test_order_of_deps_is_consistent(self): """ Previously the 'dag_dependencies' node in serialized dag was converted to list from set. diff --git a/tests/serialization/test_dag_serialization.py b/tests/serialization/test_dag_serialization.py index 5170722363..30ebbb957b 100644 --- a/tests/serialization/test_dag_serialization.py +++ b/tests/serialization/test_dag_serialization.py @@ -232,7 +232,7 @@ serialized_simple_dag_ground_truth = { }, "edge_info": {}, "dag_dependencies": [], - "params": {}, + "params": [], }, } @@ -2034,6 +2034,25 @@ class TestStringifiedDAGs: assert isinstance(dag.params.get_param("none"), Param) assert dag.params["str"] == "str" + def test_params_serialization_from_dict_upgrade(self): + """In <=2.9.2 params were serialized as a JSON object instead of a list of key-value pairs. + This test asserts that the params are still deserialized properly.""" + serialized = { + "__version": 1, + "dag": { + "_dag_id": "simple_dag", + "fileloc": "/path/to/file.py", + "tasks": [], + "timezone": "UTC", + "params": {"my_param": {"__class": "airflow.models.param.Param", "default": "str"}}, + }, + } + dag = SerializedDAG.from_dict(serialized) + + param = dag.params.get_param("my_param") + assert isinstance(param, Param) + assert param.value == "str" + def test_params_serialize_default_2_2_0(self): """In 2.0.0, param ``default`` was assumed to be json-serializable objects and were not run though the standard serializer function. In 2.2.2 we serialize param ``default``. We keep this @@ -2045,7 +2064,7 @@ class TestStringifiedDAGs: "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": {"str": {"__class": "airflow.models.param.Param", "default": "str"}}, + "params": [["str", {"__class": "airflow.models.param.Param", "default": "str"}]], }, } SerializedDAG.validate_schema(serialized) @@ -2062,14 +2081,17 @@ class TestStringifiedDAGs: "fileloc": "/path/to/file.py", "tasks": [], "timezone": "UTC", - "params": { - "my_param": { - "default": "a string value", - "description": "hello", - "schema": {"__var": {"type": "string"}, "__type": "dict"}, - "__class": "airflow.models.param.Param", - } - }, + "params": [ + [ + "my_param", + { + "default": "a string value", + "description": "hello", + "schema": {"__var": {"type": "string"}, "__type": "dict"}, + "__class": "airflow.models.param.Param", + }, + ] + ], }, } SerializedDAG.validate_schema(serialized)
