jedcunningham commented on code in PR #25521:
URL: https://github.com/apache/airflow/pull/25521#discussion_r937186897


##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as 
Optional[DagDependency].

Review Comment:
   ```suggestion
           Prior to deprecation of custom dependency detector, the return type 
was Optional[DagDependency].
   ```



##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as 
Optional[DagDependency].
+        This class verifies that custom dependency detector classes which 
assume that return type will still
+        work until support for them is removed in 3.0.
+
+        TODO: remove in Airflow 3.0
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2

Review Comment:
   ```suggestion
   ```
   
   Not sure we need these for this test?



##########
airflow/serialization/serialized_objects.py:
##########
@@ -1048,14 +1058,11 @@ def serialize_dag(cls, dag: DAG) -> dict:
                 del serialized_dag["timetable"]
 
             serialized_dag["tasks"] = [cls._serialize(task) for _, task in 
dag.task_dict.items()]
-            dag_deps = [
-                t.__dict__
-                for task in dag.task_dict.values()
-                for t in SerializedBaseOperator.detect_dependencies(task)
-                if t is not None
-            ]
-
-            serialized_dag["dag_dependencies"] = dag_deps
+            dag_deps = {
+                d for t in dag.task_dict.values() for d in 
SerializedBaseOperator.detect_dependencies(t)

Review Comment:
   ```suggestion
                   dep for task in dag.task_dict.values() for dep in 
SerializedBaseOperator.detect_dependencies(task)
   ```
   
   Might be worth using better variable names here.



##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as 
Optional[DagDependency].
+        This class verifies that custom dependency detector classes which 
assume that return type will still
+        work until support for them is removed in 3.0.
+
+        TODO: remove in Airflow 3.0
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2
+            CustomDepOperator(task_id='hello', bash_command='hi')
+            dag = SerializedDAG.to_dict(dag)
+            assert sorted(dag['dag']['dag_dependencies'], key=lambda x: 
tuple(x.values())) == sorted(
+                [
+                    {
+                        'source': 'external_dag_id',
+                        'target': 'test',
+                        'dependency_type': 'sensor',
+                        'dependency_id': 'task1',
+                    },
+                    {
+                        'source': 'test',
+                        'target': 'nothing',
+                        'dependency_type': 'abc',
+                        'dependency_id': 'hello',
+                    },
+                ],
+                key=lambda x: tuple(x.values()),
+            )
+
+    def test_dag_deps_datasets(self):
+        """
+        Check that dag_dependencies node is populated correctly for a DAG with 
datasets.
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        d1 = Dataset('d1')
+        d2 = Dataset('d2')
+        d3 = Dataset('d2')

Review Comment:
   Might be worth a different name here, took me a minute to work out the 
d3.uri _should_ == "d2".



##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
                 }
             ]
 
+    @conf_vars(
+        {
+            (
+                'scheduler',
+                'dependency_detector',
+            ): 
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+        }
+    )
+    def test_custom_dep_detector(self):
+        """
+        Prior to deprecation of custom dependency detector, the return type as 
Optional[DagDependency].
+        This class verifies that custom dependency detector classes which 
assume that return type will still
+        work until support for them is removed in 3.0.
+
+        TODO: remove in Airflow 3.0
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date) as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2
+            CustomDepOperator(task_id='hello', bash_command='hi')
+            dag = SerializedDAG.to_dict(dag)
+            assert sorted(dag['dag']['dag_dependencies'], key=lambda x: 
tuple(x.values())) == sorted(
+                [
+                    {
+                        'source': 'external_dag_id',
+                        'target': 'test',
+                        'dependency_type': 'sensor',
+                        'dependency_id': 'task1',
+                    },
+                    {
+                        'source': 'test',
+                        'target': 'nothing',
+                        'dependency_type': 'abc',
+                        'dependency_id': 'hello',
+                    },
+                ],
+                key=lambda x: tuple(x.values()),
+            )
+
+    def test_dag_deps_datasets(self):
+        """
+        Check that dag_dependencies node is populated correctly for a DAG with 
datasets.
+        """
+        from airflow.operators.empty import EmptyOperator
+        from airflow.sensors.external_task import ExternalTaskSensor
+
+        d1 = Dataset('d1')
+        d2 = Dataset('d2')
+        d3 = Dataset('d2')
+        d4 = Dataset('d4')
+        execution_date = datetime(2020, 1, 1)
+        with DAG(dag_id="test", start_date=execution_date, schedule_on=[d1]) 
as dag:
+            task1 = ExternalTaskSensor(
+                task_id="task1",
+                external_dag_id="external_dag_id",
+                mode="reschedule",
+            )
+            task2 = EmptyOperator(task_id="task2")
+            task1 >> task2

Review Comment:
   ```suggestion
   ```
   
   Same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to