This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c76d57a67f Fix hashing of dag_dependencies in serialized dag (#32037)
c76d57a67f is described below
commit c76d57a67f1091e49cc9a8255054970e05ab1de5
Author: Daniel Standish <[email protected]>
AuthorDate: Tue Jun 20 20:41:58 2023 -0700
Fix hashing of dag_dependencies in serialized dag (#32037)
---
airflow/serialization/serialized_objects.py | 4 +--
tests/models/test_serialized_dag.py | 46 ++++++++++++++++++++++++++++-
2 files changed, 47 insertions(+), 3 deletions(-)
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index 0f319a1afb..3a528e9c8a 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -1238,7 +1238,7 @@ class SerializedDAG(DAG, BaseSerialization):
for dep in SerializedBaseOperator.detect_dependencies(task)
}
dag_deps.update(DependencyDetector.detect_dag_dependencies(dag))
- serialized_dag["dag_dependencies"] = [x.__dict__ for x in dag_deps]
+ serialized_dag["dag_dependencies"] = [x.__dict__ for x in
sorted(dag_deps)]
serialized_dag["_task_group"] =
TaskGroupSerialization.serialize_task_group(dag.task_group)
# Edge info in the JSON exactly matches our internal structure
@@ -1444,7 +1444,7 @@ class TaskGroupSerialization(BaseSerialization):
return group
-@dataclass(frozen=True)
+@dataclass(frozen=True, order=True)
class DagDependency:
"""Dataclass for representing dependencies between DAGs.
These are calculated during serialization and attached to serialized DAGs.
diff --git a/tests/models/test_serialized_dag.py
b/tests/models/test_serialized_dag.py
index b425cd8f65..61be12676f 100644
--- a/tests/models/test_serialized_dag.py
+++ b/tests/models/test_serialized_dag.py
@@ -20,13 +20,17 @@ from __future__ import annotations
from unittest import mock
+import pendulum
import pytest
-from airflow import DAG, example_dags as example_dags_module
+from airflow import DAG, Dataset, example_dags as example_dags_module
from airflow.models import DagBag
from airflow.models.dagcode import DagCode
from airflow.models.serialized_dag import SerializedDagModel as SDM
+from airflow.operators.bash import BashOperator
from airflow.serialization.serialized_objects import SerializedDAG
+from airflow.settings import json
+from airflow.utils.hashlib_wrapper import md5
from airflow.utils.session import create_session
from tests.test_utils import db
from tests.test_utils.asserts import assert_queries_count
@@ -196,3 +200,43 @@ class TestSerializedDagModel:
expected_dependencies = {dag_id: [] for dag_id in example_dags}
assert SDM.get_dag_dependencies() == expected_dependencies
+
+ def test_order_of_deps_is_consistent(self):
+ """
+ Previously the 'dag_dependencies' node in serialized dag was converted
to list from set.
+ This caused the order, and thus the hash value, to be unreliable,
which could produce
+ excessive dag parsing.
+ """
+ first_dag_hash = None
+ for r in range(10):
+ with DAG(
+ dag_id="example",
+ start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
+ schedule=[
+ Dataset("1"),
+ Dataset("2"),
+ Dataset("3"),
+ Dataset("4"),
+ Dataset("5"),
+ ],
+ ) as dag6:
+ BashOperator(
+ task_id="any",
+ outlets=[Dataset("0*"), Dataset("6*")],
+ bash_command="sleep 5",
+ )
+ deps_order = [x["dependency_id"] for x in
SerializedDAG.serialize_dag(dag6)["dag_dependencies"]]
+ # in below assert, 0 and 6 both come at end because "source" is
different for them and source
+ # is the first field in DagDependency class
+ assert deps_order == ["1", "2", "3", "4", "5", "0*", "6*"]
+
+ # for good measure, let's check that the dag hash is consistent
+ dag_json = json.dumps(SerializedDAG.to_dict(dag6),
sort_keys=True).encode("utf-8")
+ this_dag_hash = md5(dag_json).hexdigest()
+
+ # set first dag hash on first pass
+ if first_dag_hash is None:
+ first_dag_hash = this_dag_hash
+
+ # dag hash should not change without change in structure (we're in
a loop)
+ assert this_dag_hash == first_dag_hash