This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-9-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 92777adbb367d549c654d6ae9856d0f19d671a81
Author: Usiel Riedl <[email protected]>
AuthorDate: Thu Jun 13 11:11:16 2024 +0800

    Ensures DAG params order regardless of backend (#40156)
    
    * Ensures DAG params order regardless of backend
    
    Fixes https://github.com/apache/airflow/issues/40154
    
    This change adds an extra attribute to the serialized DAG param objects 
which helps us decide
    the order of the deserialized params dictionary later even if the backend 
messes with us.
    
    I decided not to limit this just to MySQL since the operation is 
inexpensive and may turn
    out to be helpful.
    
    I made sure the new test fails with the old implementation + MySQL. I 
assume this test will be
    executed with MySQL somewhere in the build actions?
    
    * Removes GitHub reference
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * Serialize DAG params as array of tuples to ensure ordering
    
    Alternative to previous approach: We serialize the DAG params dict as a 
list of tuples which _should_ keep their ordering regardless of backend.
    
    Backwards compatibility is ensured because if `encoded_params` is a `dict` 
(not the expected `list`) then `dict(encoded_params)` still works.
    
    * Make backwards compatibility more explicit
    
    Based on suggestions by @uranusjr with an additional fix to make mypy happy.
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    (cherry picked from commit 2149b4dbee8fb524bb235280aaef158afaec8d4a)
---
 airflow/serialization/schema.json             | 14 +++++----
 airflow/serialization/serialized_objects.py   | 18 ++++++++----
 tests/models/test_serialized_dag.py           | 16 ++++++++++
 tests/serialization/test_dag_serialization.py | 42 ++++++++++++++++++++-------
 4 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/airflow/serialization/schema.json 
b/airflow/serialization/schema.json
index a2a6732763..3df4b533d8 100644
--- a/airflow/serialization/schema.json
+++ b/airflow/serialization/schema.json
@@ -136,7 +136,7 @@
     "dag": {
       "type": "object",
       "properties": {
-        "params": { "$ref": "#/definitions/params_dict" },
+        "params": { "$ref": "#/definitions/params" },
         "_dag_id": { "type": "string" },
         "tasks": {  "$ref": "#/definitions/tasks" },
         "timezone": { "$ref": "#/definitions/timezone" },
@@ -206,9 +206,13 @@
       "type": "array",
       "additionalProperties": { "$ref": "#/definitions/operator" }
     },
-    "params_dict": {
-      "type": "object",
-      "additionalProperties": {"$ref": "#/definitions/param" }
+    "params": {
+      "type": "array",
+      "prefixItems": [
+          { "type": "string" },
+          { "$ref": "#/definitions/param" }
+      ],
+      "unevaluatedItems": false
     },
     "param": {
       "$comment": "A param for a dag / operator",
@@ -258,7 +262,7 @@
         "retry_delay": { "$ref": "#/definitions/timedelta" },
         "retry_exponential_backoff": { "type": "boolean" },
         "max_retry_delay": { "$ref": "#/definitions/timedelta" },
-        "params": { "$ref": "#/definitions/params_dict" },
+        "params": { "$ref": "#/definitions/params" },
         "priority_weight": { "type": "number" },
         "weight_rule": { "type": "string" },
         "executor_config": { "$ref": "#/definitions/dict" },
diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 16a5c9e481..fdc0523488 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -777,9 +777,9 @@ class BaseSerialization:
         return class_(**kwargs)
 
     @classmethod
-    def _serialize_params_dict(cls, params: ParamsDict | dict):
-        """Serialize Params dict for a DAG or task."""
-        serialized_params = {}
+    def _serialize_params_dict(cls, params: ParamsDict | dict) -> 
list[tuple[str, dict]]:
+        """Serialize Params dict for a DAG or task as a list of tuples to 
ensure ordering."""
+        serialized_params = []
         for k, v in params.items():
             # TODO: As of now, we would allow serialization of params which 
are of type Param only.
             try:
@@ -787,7 +787,7 @@ class BaseSerialization:
             except AttributeError:
                 class_identity = ""
             if class_identity == "airflow.models.param.Param":
-                serialized_params[k] = cls._serialize_param(v)
+                serialized_params.append((k, cls._serialize_param(v)))
             else:
                 raise ValueError(
                     f"Params to a DAG or a Task can be only of type 
airflow.models.param.Param, "
@@ -796,10 +796,16 @@ class BaseSerialization:
         return serialized_params
 
     @classmethod
-    def _deserialize_params_dict(cls, encoded_params: dict) -> ParamsDict:
+    def _deserialize_params_dict(cls, encoded_params: list[tuple[str, dict]]) 
-> ParamsDict:
         """Deserialize a DAG's Params dict."""
+        if isinstance(encoded_params, collections.abc.Mapping):
+            # in 2.9.2 or earlier params were serialized as JSON objects
+            encoded_param_pairs: Iterable[tuple[str, dict]] = 
encoded_params.items()
+        else:
+            encoded_param_pairs = encoded_params
+
         op_params = {}
-        for k, v in encoded_params.items():
+        for k, v in encoded_param_pairs:
             if isinstance(v, dict) and "__class" in v:
                 op_params[k] = cls._deserialize_param(v)
             else:
diff --git a/tests/models/test_serialized_dag.py 
b/tests/models/test_serialized_dag.py
index 627943c6fd..cb62976e55 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -206,6 +206,22 @@ class TestSerializedDagModel:
         expected_dependencies = {dag_id: [] for dag_id in example_dags}
         assert SDM.get_dag_dependencies() == expected_dependencies
 
+    def test_order_of_dag_params_is_stable(self):
+        """
+        This asserts that we have logic in place which guarantees the order
+        of the params is maintained - even if the backend (e.g. MySQL) mutates
+        the serialized DAG JSON.
+        """
+        example_dags = make_example_dags(example_dags_module)
+        example_params_trigger_ui = 
example_dags.get("example_params_trigger_ui")
+        before = list(example_params_trigger_ui.params.keys())
+
+        SDM.write_dag(example_params_trigger_ui)
+        retrieved_dag = SDM.get_dag("example_params_trigger_ui")
+        after = list(retrieved_dag.params.keys())
+
+        assert before == after
+
     def test_order_of_deps_is_consistent(self):
         """
         Previously the 'dag_dependencies' node in serialized dag was converted 
to list from set.
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index 5170722363..30ebbb957b 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -232,7 +232,7 @@ serialized_simple_dag_ground_truth = {
         },
         "edge_info": {},
         "dag_dependencies": [],
-        "params": {},
+        "params": [],
     },
 }
 
@@ -2034,6 +2034,25 @@ class TestStringifiedDAGs:
         assert isinstance(dag.params.get_param("none"), Param)
         assert dag.params["str"] == "str"
 
+    def test_params_serialization_from_dict_upgrade(self):
+        """In <=2.9.2 params were serialized as a JSON object instead of a 
list of key-value pairs.
+        This test asserts that the params are still deserialized properly."""
+        serialized = {
+            "__version": 1,
+            "dag": {
+                "_dag_id": "simple_dag",
+                "fileloc": "/path/to/file.py",
+                "tasks": [],
+                "timezone": "UTC",
+                "params": {"my_param": {"__class": 
"airflow.models.param.Param", "default": "str"}},
+            },
+        }
+        dag = SerializedDAG.from_dict(serialized)
+
+        param = dag.params.get_param("my_param")
+        assert isinstance(param, Param)
+        assert param.value == "str"
+
     def test_params_serialize_default_2_2_0(self):
         """In 2.0.0, param ``default`` was assumed to be json-serializable 
objects and were not run though
         the standard serializer function.  In 2.2.2 we serialize param 
``default``.  We keep this
@@ -2045,7 +2064,7 @@ class TestStringifiedDAGs:
                 "fileloc": "/path/to/file.py",
                 "tasks": [],
                 "timezone": "UTC",
-                "params": {"str": {"__class": "airflow.models.param.Param", 
"default": "str"}},
+                "params": [["str", {"__class": "airflow.models.param.Param", 
"default": "str"}]],
             },
         }
         SerializedDAG.validate_schema(serialized)
@@ -2062,14 +2081,17 @@ class TestStringifiedDAGs:
                 "fileloc": "/path/to/file.py",
                 "tasks": [],
                 "timezone": "UTC",
-                "params": {
-                    "my_param": {
-                        "default": "a string value",
-                        "description": "hello",
-                        "schema": {"__var": {"type": "string"}, "__type": 
"dict"},
-                        "__class": "airflow.models.param.Param",
-                    }
-                },
+                "params": [
+                    [
+                        "my_param",
+                        {
+                            "default": "a string value",
+                            "description": "hello",
+                            "schema": {"__var": {"type": "string"}, "__type": 
"dict"},
+                            "__class": "airflow.models.param.Param",
+                        },
+                    ]
+                ],
             },
         }
         SerializedDAG.validate_schema(serialized)

Reply via email to