This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 78b875f3d4b Sort children when deserializing DAG (#48534)
78b875f3d4b is described below
commit 78b875f3d4b2234be858adabcd0f4830272d80d4
Author: Jed Cunningham <[email protected]>
AuthorDate: Sun Mar 30 14:50:56 2025 -0600
Sort children when deserializing DAG (#48534)
MySQL doesn't maintain the order of children round trip, so this keeps
the deserialized DAGs consistent across db types.
---
airflow-core/src/airflow/serialization/serialized_objects.py | 2 +-
.../tests/unit/serialization/test_serialized_objects.py | 11 +++++++++++
2 files changed, 12 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index 6af066c12d1..ec5ebf55d24 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -1865,7 +1865,7 @@ class TaskGroupSerialization(BaseSerialization):
if _type == DAT.OP
else cls.deserialize_task_group(val, group, task_dict, dag=dag)
)
- for label, (_type, val) in encoded_group["children"].items()
+ for label, (_type, val) in
sorted(encoded_group["children"].items())
}
group.upstream_group_ids.update(cls.deserialize(encoded_group["upstream_group_ids"]))
group.downstream_group_ids.update(cls.deserialize(encoded_group["downstream_group_ids"]))
diff --git a/airflow-core/tests/unit/serialization/test_serialized_objects.py
b/airflow-core/tests/unit/serialization/test_serialized_objects.py
index 22265ba9eee..ec3a2256cc3 100644
--- a/airflow-core/tests/unit/serialization/test_serialized_objects.py
+++ b/airflow-core/tests/unit/serialization/test_serialized_objects.py
@@ -139,6 +139,12 @@ DAG_RUN = DagRun(
DAG_RUN.id = 1
+# we add the tasks out of order, to ensure they are deserialized in the
correct order
+DAG_WITH_TASKS = DAG(dag_id="test_dag", start_date=datetime.now())
+EmptyOperator(task_id="task2", dag=DAG_WITH_TASKS)
+EmptyOperator(task_id="task1", dag=DAG_WITH_TASKS)
+
+
def create_outlet_event_accessors(
key: Asset | AssetAlias, extra: dict, asset_alias_events:
list[AssetAliasEvent]
) -> OutletEventAccessors:
@@ -303,6 +309,11 @@ class MockLazySelectSequence(LazySelectSequence):
DAT.AIRFLOW_EXC_SER,
equal_exception,
),
+ (
+ DAG_WITH_TASKS,
+ DAT.DAG,
+ lambda _, b: list(b.task_group.children.keys()) ==
sorted(b.task_group.children.keys()),
+ ),
],
)
def test_serialize_deserialize(input, encoded_type, cmp_func):