This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 2a6d3471cd test(serialization): add test case
test_dag_deps_datasets_with_duplicate_dataset to verify that duplicate datasets
in schedule and outlet of a DAG is deduplicated (#40984)
2a6d3471cd is described below
commit 2a6d3471cdcbaa96237acebc3a3476a084f5f358
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jul 24 16:02:30 2024 +0800
test(serialization): add test case
test_dag_deps_datasets_with_duplicate_dataset to verify that duplicate datasets
in schedule and outlet of a DAG is deduplicated (#40984)
---
tests/serialization/test_dag_serialization.py | 65 +++++++++++++++++++++++++++
1 file changed, 65 insertions(+)
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index b2929e8c27..65c4650c7a 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1736,6 +1736,71 @@ class TestStringifiedDAGs:
key=lambda x: tuple(x.values()),
)
+ @pytest.mark.db_test
+ def test_dag_deps_datasets_with_duplicate_dataset(self):
+ """
+ Check that dag_dependencies node is populated correctly for a DAG with
duplicate datasets.
+ """
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ d1 = Dataset("d1")
+ d2 = Dataset("d2")
+ d3 = Dataset("d3")
+ d4 = Dataset("d4")
+ execution_date = datetime(2020, 1, 1)
+ with DAG(dag_id="test", start_date=execution_date, schedule=[d1, d1,
d1, d1, d1]) as dag:
+ ExternalTaskSensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ BashOperator(task_id="dataset_writer", bash_command="echo hello",
outlets=[d2, d2, d2, d3])
+
+ @dag.task(outlets=[d4])
+ def other_dataset_writer(x):
+ pass
+
+ other_dataset_writer.expand(x=[1, 2])
+
+ dag = SerializedDAG.to_dict(dag)
+ actual = sorted(dag["dag"]["dag_dependencies"], key=lambda x:
tuple(x.values()))
+ expected = sorted(
+ [
+ {
+ "source": "test",
+ "target": "dataset",
+ "dependency_type": "dataset",
+ "dependency_id": "d4",
+ },
+ {
+ "source": "external_dag_id",
+ "target": "test",
+ "dependency_type": "sensor",
+ "dependency_id": "task1",
+ },
+ {
+ "source": "test",
+ "target": "dataset",
+ "dependency_type": "dataset",
+ "dependency_id": "d3",
+ },
+ {
+ "source": "test",
+ "target": "dataset",
+ "dependency_type": "dataset",
+ "dependency_id": "d2",
+ },
+ {
+ "source": "dataset",
+ "target": "test",
+ "dependency_type": "dataset",
+ "dependency_id": "d1",
+ },
+ ],
+ key=lambda x: tuple(x.values()),
+ )
+ assert actual == expected
+
@pytest.mark.db_test
def test_dag_deps_datasets(self):
"""