This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new cacd13873b9 [v3-1-test] Fix inconsistent Dag hashes when template 
fields contain unordered dicts (#59091) (#59175)
cacd13873b9 is described below

commit cacd13873b99c2873fc967b0af3b746d57dd266a
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sun Dec 7 22:11:25 2025 +0100

    [v3-1-test] Fix inconsistent Dag hashes when template fields contain 
unordered dicts (#59091) (#59175)
    
    Dags using operators with dictionary values in template_fields (such as
    env_vars) were getting different hashes on each parse, even when the
    actual dictionary content was unchanged. This happened because
    serialize_template_field converts dictionaries to string
    using str(), which preserves insertion order. When dictionary ordering
    varies between parses (e.g., when env_vars comes from os.environ.copy()),
    the string representation differs, causing inconsistent hashing.
    
    Prevents unnecessary Dag updates and reprocessing when only dictionary
    ordering differs in template fields.
    (cherry picked from commit d46a9d120657d7fa0c70477d961120024fecd893)
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 airflow-core/src/airflow/serialization/helpers.py  | 14 ++++++
 .../tests/unit/models/test_serialized_dag.py       | 53 ++++++++++++++++++++++
 2 files changed, 67 insertions(+)

diff --git a/airflow-core/src/airflow/serialization/helpers.py 
b/airflow-core/src/airflow/serialization/helpers.py
index 949b3cb9c9f..5f564df6188 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -51,6 +51,16 @@ def serialize_template_field(template_field: Any, name: str) 
-> str | dict | lis
             return {key: translate_tuples_to_lists(value) for key, value in 
obj.items()}
         return obj
 
+    def sort_dict_recursively(obj: Any) -> Any:
+        """Recursively sort dictionaries to ensure consistent ordering."""
+        if isinstance(obj, dict):
+            return {k: sort_dict_recursively(v) for k, v in 
sorted(obj.items())}
+        if isinstance(obj, list):
+            return [sort_dict_recursively(item) for item in obj]
+        if isinstance(obj, tuple):
+            return tuple(sort_dict_recursively(item) for item in obj)
+        return obj
+
     max_length = conf.getint("core", "max_templated_field_length")
 
     if not is_jsonable(template_field):
@@ -70,6 +80,10 @@ def serialize_template_field(template_field: Any, name: str) 
-> str | dict | lis
         # and need to be converted to lists
         return template_field
     template_field = translate_tuples_to_lists(template_field)
+    # Sort dictionaries recursively to ensure consistent string representation
+    # This prevents hash inconsistencies when dict ordering varies
+    if isinstance(template_field, dict):
+        template_field = sort_dict_recursively(template_field)
     serialized = str(template_field)
     if len(serialized) > max_length:
         rendered = redact(serialized, name)
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py 
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 92446cee218..a81ecaf13fc 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -545,6 +545,59 @@ class TestSerializedDagModel:
         assert "fileloc" in test_data["dag"]
         assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"
 
+    def 
test_hash_method_consistent_with_dict_ordering_in_template_fields(self, 
dag_maker):
+        from airflow.sdk.bases.operator import BaseOperator
+
+        class MyCustomOp(BaseOperator):
+            template_fields = ("env_vars",)
+
+            def __init__(self, *, task_id: str, **kwargs):
+                super().__init__(task_id=task_id, **kwargs)
+                self.env_vars = {"KEY1": "value1", "KEY2": "value2", "KEY3": 
"value3"}
+
+        # Create first DAG with env_vars in one order
+        with dag_maker("test_dag") as dag1:
+            MyCustomOp(task_id="task1")
+
+        serialized_dag_1 = SerializedDAG.to_dict(dag1)
+
+        # Create second DAG with env_vars in different order
+        with dag_maker("test_dag") as dag2:
+            task = MyCustomOp(task_id="task1")
+            # Recreate dict with different insertion order
+            task.env_vars = {"KEY3": "value3", "KEY1": "value1", "KEY2": 
"value2"}
+
+        serialized_dag_2 = SerializedDAG.to_dict(dag2)
+
+        # Verify that the original env_vars have different ordering
+        env_vars_1 = None
+        env_vars_2 = None
+        for task in serialized_dag_1["dag"]["tasks"]:
+            if task["__var"]["task_id"] == "task1":
+                env_vars_1 = task["__var"].get("env_vars")
+        for task in serialized_dag_2["dag"]["tasks"]:
+            if task["__var"]["task_id"] == "task1":
+                env_vars_2 = task["__var"].get("env_vars")
+
+        assert env_vars_1 is not None, "serialized_dag_1 should have env_vars"
+        assert env_vars_2 is not None, "serialized_dag_2 should have env_vars"
+        # The serialized env_vars should be sorted dicts (or strings if 
truncated)
+        # If they're dicts, verify they're sorted; if strings, they should be 
equal due to sorting
+        if isinstance(env_vars_1, dict) and isinstance(env_vars_2, dict):
+            # Both should be sorted dictionaries with same content
+            assert list(env_vars_1.keys()) == sorted(env_vars_1.keys())
+            assert list(env_vars_2.keys()) == sorted(env_vars_2.keys())
+            assert env_vars_1 == env_vars_2, "Sorted dicts should be equal 
regardless of original order"
+        elif isinstance(env_vars_1, str) and isinstance(env_vars_2, str):
+            # If truncated to strings, they should be equal due to sorting
+            assert env_vars_1 == env_vars_2, "String representations should be 
equal due to sorting"
+
+        hash_1 = SDM.hash(serialized_dag_1)
+        hash_2 = SDM.hash(serialized_dag_2)
+
+        # Hashes should be identical
+        assert hash_1 == hash_2, "Hashes should be identical when dicts are 
sorted consistently"
+
     def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
         """
         Test that dynamic DAG update gracefully handles case where 
SerializedDagModel doesn't exist.

Reply via email to