This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d46a9d12065 Fix inconsistent Dag hashes when template fields contain 
unordered dicts (#59091)
d46a9d12065 is described below

commit d46a9d120657d7fa0c70477d961120024fecd893
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Dec 7 22:09:32 2025 +0100

    Fix inconsistent Dag hashes when template fields contain unordered dicts 
(#59091)
    
    Dags using operators with dictionary values in template_fields (such as
    env_vars) were getting different hashes on each parse, even when the
    actual dictionary content was unchanged. This happened because
    serialize_template_field converts dictionaries to string
    using str(), which preserves insertion order. When dictionary ordering
    varies between parses (e.g., when env_vars comes from os.environ.copy()),
    the string representation differs, causing inconsistent hashing.
    
    Prevents unnecessary Dag updates and reprocessing when only dictionary
    ordering differs in template fields.
---
 airflow-core/src/airflow/serialization/helpers.py  | 14 ++++++
 .../tests/unit/models/test_serialized_dag.py       | 53 ++++++++++++++++++++++
 2 files changed, 67 insertions(+)

diff --git a/airflow-core/src/airflow/serialization/helpers.py 
b/airflow-core/src/airflow/serialization/helpers.py
index e9d90fe61ae..3af7f3c07e7 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -55,6 +55,16 @@ def serialize_template_field(template_field: Any, name: str) 
-> str | dict | lis
             return {key: translate_tuples_to_lists(value) for key, value in 
obj.items()}
         return obj
 
+    def sort_dict_recursively(obj: Any) -> Any:
+        """Recursively sort dictionaries to ensure consistent ordering."""
+        if isinstance(obj, dict):
+            return {k: sort_dict_recursively(v) for k, v in 
sorted(obj.items())}
+        if isinstance(obj, list):
+            return [sort_dict_recursively(item) for item in obj]
+        if isinstance(obj, tuple):
+            return tuple(sort_dict_recursively(item) for item in obj)
+        return obj
+
     max_length = conf.getint("core", "max_templated_field_length")
 
     if not is_jsonable(template_field):
@@ -74,6 +84,10 @@ def serialize_template_field(template_field: Any, name: str) 
-> str | dict | lis
         # and need to be converted to lists
         return template_field
     template_field = translate_tuples_to_lists(template_field)
+    # Sort dictionaries recursively to ensure consistent string representation
+    # This prevents hash inconsistencies when dict ordering varies
+    if isinstance(template_field, dict):
+        template_field = sort_dict_recursively(template_field)
     serialized = str(template_field)
     if len(serialized) > max_length:
         rendered = redact(serialized, name)
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 65877061dae..2472c072046 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -554,6 +554,59 @@ class TestSerializedDagModel:
         assert "fileloc" in test_data["dag"]
         assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"
 
+    def 
test_hash_method_consistent_with_dict_ordering_in_template_fields(self, 
dag_maker):
+        from airflow.sdk.bases.operator import BaseOperator
+
+        class MyCustomOp(BaseOperator):
+            template_fields = ("env_vars",)
+
+            def __init__(self, *, task_id: str, **kwargs):
+                super().__init__(task_id=task_id, **kwargs)
+                self.env_vars = {"KEY1": "value1", "KEY2": "value2", "KEY3": 
"value3"}
+
+        # Create first DAG with env_vars in one order
+        with dag_maker("test_dag") as dag1:
+            MyCustomOp(task_id="task1")
+
+        serialized_dag_1 = SerializedDAG.to_dict(dag1)
+
+        # Create second DAG with env_vars in different order
+        with dag_maker("test_dag") as dag2:
+            task = MyCustomOp(task_id="task1")
+            # Recreate dict with different insertion order
+            task.env_vars = {"KEY3": "value3", "KEY1": "value1", "KEY2": 
"value2"}
+
+        serialized_dag_2 = SerializedDAG.to_dict(dag2)
+
+        # Verify that the original env_vars have different ordering
+        env_vars_1 = None
+        env_vars_2 = None
+        for task in serialized_dag_1["dag"]["tasks"]:
+            if task["__var"]["task_id"] == "task1":
+                env_vars_1 = task["__var"].get("env_vars")
+        for task in serialized_dag_2["dag"]["tasks"]:
+            if task["__var"]["task_id"] == "task1":
+                env_vars_2 = task["__var"].get("env_vars")
+
+        assert env_vars_1 is not None, "serialized_dag_1 should have env_vars"
+        assert env_vars_2 is not None, "serialized_dag_2 should have env_vars"
+        # The serialized env_vars should be sorted dicts (or strings if 
truncated)
+        # If they're dicts, verify they're sorted; if strings, they should be 
equal due to sorting
+        if isinstance(env_vars_1, dict) and isinstance(env_vars_2, dict):
+            # Both should be sorted dictionaries with same content
+            assert list(env_vars_1.keys()) == sorted(env_vars_1.keys())
+            assert list(env_vars_2.keys()) == sorted(env_vars_2.keys())
+            assert env_vars_1 == env_vars_2, "Sorted dicts should be equal 
regardless of original order"
+        elif isinstance(env_vars_1, str) and isinstance(env_vars_2, str):
+            # If truncated to strings, they should be equal due to sorting
+            assert env_vars_1 == env_vars_2, "String representations should be 
equal due to sorting"
+
+        hash_1 = SDM.hash(serialized_dag_1)
+        hash_2 = SDM.hash(serialized_dag_2)
+
+        # Hashes should be identical
+        assert hash_1 == hash_2, "Hashes should be identical when dicts are 
sorted consistently"
+
     def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
         """
         Test that dynamic DAG update gracefully handles case where 
SerializedDagModel doesn't exist.

Reply via email to