This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a6d3471cd test(serialization): add test case 
test_dag_deps_datasets_with_duplicate_dataset to verify that duplicate datasets 
in schedule and outlet of a DAG is deduplicated (#40984)
2a6d3471cd is described below

commit 2a6d3471cdcbaa96237acebc3a3476a084f5f358
Author: Wei Lee <[email protected]>
AuthorDate: Wed Jul 24 16:02:30 2024 +0800

    test(serialization): add test case 
test_dag_deps_datasets_with_duplicate_dataset to verify that duplicate datasets 
in schedule and outlet of a DAG is deduplicated (#40984)
---
 tests/serialization/test_dag_serialization.py | 65 +++++++++++++++++++++++++++
 1 file changed, 65 insertions(+)

diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index b2929e8c27..65c4650c7a 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -1736,6 +1736,71 @@ class TestStringifiedDAGs:
                 key=lambda x: tuple(x.values()),
             )
 
+    @pytest.mark.db_test
+    def test_dag_deps_datasets_with_duplicate_dataset(self):
+        """
+        Check that dag_dependencies node is populated correctly for a DAG with 
duplicate datasets.
+        """
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        d1 = Dataset("d1")
+        d2 = Dataset("d2")
+        d3 = Dataset("d3")
+        d4 = Dataset("d4")
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date, schedule=[d1, d1, 
d1, d1, d1]) as dag:
+            ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            BashOperator(task_id="dataset_writer", bash_command="echo hello", 
outlets=[d2, d2, d2, d3])
+
+            @dag.task(outlets=[d4])
+            def other_dataset_writer(x):
+                pass
+
+            other_dataset_writer.expand(x=[1, 2])
+
+        dag = SerializedDAG.to_dict(dag)
+        actual = sorted(dag["dag"]["dag_dependencies"], key=lambda x: 
tuple(x.values()))
+        expected = sorted(
+            [
+                {
+                    "source": "test",
+                    "target": "dataset",
+                    "dependency_type": "dataset",
+                    "dependency_id": "d4",
+                },
+                {
+                    "source": "external_dag_id",
+                    "target": "test",
+                    "dependency_type": "sensor",
+                    "dependency_id": "task1",
+                },
+                {
+                    "source": "test",
+                    "target": "dataset",
+                    "dependency_type": "dataset",
+                    "dependency_id": "d3",
+                },
+                {
+                    "source": "test",
+                    "target": "dataset",
+                    "dependency_type": "dataset",
+                    "dependency_id": "d2",
+                },
+                {
+                    "source": "dataset",
+                    "target": "test",
+                    "dependency_type": "dataset",
+                    "dependency_id": "d1",
+                },
+            ],
+            key=lambda x: tuple(x.values()),
+        )
+        assert actual == expected
+
     @pytest.mark.db_test
     def test_dag_deps_datasets(self):
         """

Reply via email to