This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-6-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 2813c41c1b93ea020d5788d4bb08c775abb6b90f
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Jun 20 20:41:58 2023 -0700

    Fix hashing of dag_dependencies in serialized dag (#32037)
    
    (cherry picked from commit c76d57a67f1091e49cc9a8255054970e05ab1de5)
---
 airflow/serialization/serialized_objects.py |  4 +--
 tests/models/test_serialized_dag.py         | 46 ++++++++++++++++++++++++++++-
 2 files changed, 47 insertions(+), 3 deletions(-)

diff --git a/airflow/serialization/serialized_objects.py 
b/airflow/serialization/serialized_objects.py
index 04c427309c..058da502a8 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1240,7 +1240,7 @@ class SerializedDAG(DAG, BaseSerialization):
                 for dep in SerializedBaseOperator.detect_dependencies(task)
             }
             dag_deps.update(DependencyDetector.detect_dag_dependencies(dag))
-            serialized_dag["dag_dependencies"] = [x.__dict__ for x in dag_deps]
+            serialized_dag["dag_dependencies"] = [x.__dict__ for x in 
sorted(dag_deps)]
             serialized_dag["_task_group"] = 
TaskGroupSerialization.serialize_task_group(dag.task_group)
 
             # Edge info in the JSON exactly matches our internal structure
@@ -1446,7 +1446,7 @@ class TaskGroupSerialization(BaseSerialization):
         return group
 
 
-@dataclass(frozen=True)
+@dataclass(frozen=True, order=True)
 class DagDependency:
     """Dataclass for representing dependencies between DAGs.
     These are calculated during serialization and attached to serialized DAGs.
diff --git a/tests/models/test_serialized_dag.py 
b/tests/models/test_serialized_dag.py
index b425cd8f65..61be12676f 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -20,13 +20,17 @@ from __future__ import annotations
 
 from unittest import mock
 
+import pendulum
 import pytest
 
-from airflow import DAG, example_dags as example_dags_module
+from airflow import DAG, Dataset, example_dags as example_dags_module
 from airflow.models import DagBag
 from airflow.models.dagcode import DagCode
 from airflow.models.serialized_dag import SerializedDagModel as SDM
+from airflow.operators.bash import BashOperator
 from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.settings import json
+from airflow.utils.hashlib_wrapper import md5
 from airflow.utils.session import create_session
 from tests.test_utils import db
 from tests.test_utils.asserts import assert_queries_count
@@ -196,3 +200,43 @@ class TestSerializedDagModel:
 
         expected_dependencies = {dag_id: [] for dag_id in example_dags}
         assert SDM.get_dag_dependencies() == expected_dependencies
+
+    def test_order_of_deps_is_consistent(self):
+        """
+        Previously the 'dag_dependencies' node in serialized dag was converted 
to list from set.
+        This caused the order, and thus the hash value, to be unreliable, 
which could produce
+        excessive dag parsing.
+        """
+        first_dag_hash = None
+        for r in range(10):
+            with DAG(
+                dag_id="example",
+                start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+                schedule=[
+                    Dataset("1"),
+                    Dataset("2"),
+                    Dataset("3"),
+                    Dataset("4"),
+                    Dataset("5"),
+                ],
+            ) as dag6:
+                BashOperator(
+                    task_id="any",
+                    outlets=[Dataset("0*"), Dataset("6*")],
+                    bash_command="sleep 5",
+                )
+            deps_order = [x["dependency_id"] for x in 
SerializedDAG.serialize_dag(dag6)["dag_dependencies"]]
+            # in below assert, 0 and 6 both come at end because "source" is 
different for them and source
+            # is the first field in DagDependency class
+            assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"]
+
+            # for good measure, let's check that the dag hash is consistent
+            dag_json = json.dumps(SerializedDAG.to_dict(dag6), 
sort_keys=True).encode("utf-8")
+            this_dag_hash = md5(dag_json).hexdigest()
+
+            # set first dag hash on first pass
+            if first_dag_hash is None:
+                first_dag_hash = this_dag_hash
+
+            # dag hash should not change without change in structure (we're in 
a loop)
+            assert this_dag_hash == first_dag_hash

Reply via email to