This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d46a9d12065 Fix inconsistent Dag hashes when template fields contain
unordered dicts (#59091)
d46a9d12065 is described below
commit d46a9d120657d7fa0c70477d961120024fecd893
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Sun Dec 7 22:09:32 2025 +0100
Fix inconsistent Dag hashes when template fields contain unordered dicts
(#59091)
Dags using operators with dictionary values in template_fields (such as
env_vars) were getting different hashes on each parse, even when the
actual dictionary content was unchanged. This happened because
serialize_template_field converts dictionaries to string
using str(), which preserves insertion order. When dictionary ordering
varies between parses (e.g., when env_vars comes from os.environ.copy()),
the string representation differs, causing inconsistent hashing.
Prevents unnecessary Dag updates and reprocessing when only dictionary
ordering differs in template fields.
---
airflow-core/src/airflow/serialization/helpers.py | 14 ++++++
.../tests/unit/models/test_serialized_dag.py | 53 ++++++++++++++++++++++
2 files changed, 67 insertions(+)
diff --git a/airflow-core/src/airflow/serialization/helpers.py
b/airflow-core/src/airflow/serialization/helpers.py
index e9d90fe61ae..3af7f3c07e7 100644
--- a/airflow-core/src/airflow/serialization/helpers.py
+++ b/airflow-core/src/airflow/serialization/helpers.py
@@ -55,6 +55,16 @@ def serialize_template_field(template_field: Any, name: str)
-> str | dict | lis
return {key: translate_tuples_to_lists(value) for key, value in
obj.items()}
return obj
+ def sort_dict_recursively(obj: Any) -> Any:
+ """Recursively sort dictionaries to ensure consistent ordering."""
+ if isinstance(obj, dict):
+ return {k: sort_dict_recursively(v) for k, v in
sorted(obj.items())}
+ if isinstance(obj, list):
+ return [sort_dict_recursively(item) for item in obj]
+ if isinstance(obj, tuple):
+ return tuple(sort_dict_recursively(item) for item in obj)
+ return obj
+
max_length = conf.getint("core", "max_templated_field_length")
if not is_jsonable(template_field):
@@ -74,6 +84,10 @@ def serialize_template_field(template_field: Any, name: str)
-> str | dict | lis
# and need to be converted to lists
return template_field
template_field = translate_tuples_to_lists(template_field)
+ # Sort dictionaries recursively to ensure consistent string representation
+ # This prevents hash inconsistencies when dict ordering varies
+ if isinstance(template_field, dict):
+ template_field = sort_dict_recursively(template_field)
serialized = str(template_field)
if len(serialized) > max_length:
rendered = redact(serialized, name)
diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py
b/airflow-core/tests/unit/models/test_serialized_dag.py
index 65877061dae..2472c072046 100644
--- a/airflow-core/tests/unit/models/test_serialized_dag.py
+++ b/airflow-core/tests/unit/models/test_serialized_dag.py
@@ -554,6 +554,59 @@ class TestSerializedDagModel:
assert "fileloc" in test_data["dag"]
assert test_data["dag"]["fileloc"] == "/different/path/to/dag.py"
+ def
test_hash_method_consistent_with_dict_ordering_in_template_fields(self,
dag_maker):
+ from airflow.sdk.bases.operator import BaseOperator
+
+ class MyCustomOp(BaseOperator):
+ template_fields = ("env_vars",)
+
+ def __init__(self, *, task_id: str, **kwargs):
+ super().__init__(task_id=task_id, **kwargs)
+ self.env_vars = {"KEY1": "value1", "KEY2": "value2", "KEY3":
"value3"}
+
+ # Create first DAG with env_vars in one order
+ with dag_maker("test_dag") as dag1:
+ MyCustomOp(task_id="task1")
+
+ serialized_dag_1 = SerializedDAG.to_dict(dag1)
+
+ # Create second DAG with env_vars in different order
+ with dag_maker("test_dag") as dag2:
+ task = MyCustomOp(task_id="task1")
+ # Recreate dict with different insertion order
+ task.env_vars = {"KEY3": "value3", "KEY1": "value1", "KEY2":
"value2"}
+
+ serialized_dag_2 = SerializedDAG.to_dict(dag2)
+
+ # Verify that the original env_vars have different ordering
+ env_vars_1 = None
+ env_vars_2 = None
+ for task in serialized_dag_1["dag"]["tasks"]:
+ if task["__var"]["task_id"] == "task1":
+ env_vars_1 = task["__var"].get("env_vars")
+ for task in serialized_dag_2["dag"]["tasks"]:
+ if task["__var"]["task_id"] == "task1":
+ env_vars_2 = task["__var"].get("env_vars")
+
+ assert env_vars_1 is not None, "serialized_dag_1 should have env_vars"
+ assert env_vars_2 is not None, "serialized_dag_2 should have env_vars"
+ # The serialized env_vars should be sorted dicts (or strings if
truncated)
+ # If they're dicts, verify they're sorted; if strings, they should be
equal due to sorting
+ if isinstance(env_vars_1, dict) and isinstance(env_vars_2, dict):
+ # Both should be sorted dictionaries with same content
+ assert list(env_vars_1.keys()) == sorted(env_vars_1.keys())
+ assert list(env_vars_2.keys()) == sorted(env_vars_2.keys())
+ assert env_vars_1 == env_vars_2, "Sorted dicts should be equal
regardless of original order"
+ elif isinstance(env_vars_1, str) and isinstance(env_vars_2, str):
+ # If truncated to strings, they should be equal due to sorting
+ assert env_vars_1 == env_vars_2, "String representations should be
equal due to sorting"
+
+ hash_1 = SDM.hash(serialized_dag_1)
+ hash_2 = SDM.hash(serialized_dag_2)
+
+ # Hashes should be identical
+ assert hash_1 == hash_2, "Hashes should be identical when dicts are
sorted consistently"
+
def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
"""
Test that dynamic DAG update gracefully handles case where
SerializedDagModel doesn't exist.