jedcunningham commented on code in PR #25521:
URL: https://github.com/apache/airflow/pull/25521#discussion_r937186897
##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
}
]
+ @conf_vars(
+ {
+ (
+ 'scheduler',
+ 'dependency_detector',
+ ):
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+ }
+ )
+ def test_custom_dep_detector(self):
+ """
+ Prior to deprecation of custom dependency detector, the return type as
Optional[DagDependency].
Review Comment:
```suggestion
Prior to deprecation of custom dependency detector, the return type
was Optional[DagDependency].
```
##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
}
]
+ @conf_vars(
+ {
+ (
+ 'scheduler',
+ 'dependency_detector',
+ ):
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+ }
+ )
+ def test_custom_dep_detector(self):
+ """
+ Prior to deprecation of custom dependency detector, the return type as
Optional[DagDependency].
+ This class verifies that custom dependency detector classes which
assume that return type will still
+ work until support for them is removed in 3.0.
+
+ TODO: remove in Airflow 3.0
+ """
+ from airflow.operators.empty import EmptyOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ execution_date = datetime(2020, 1, 1)
+ with DAG(dag_id="test", start_date=execution_date) as dag:
+ task1 = ExternalTaskSensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = EmptyOperator(task_id="task2")
+ task1 >> task2
Review Comment:
```suggestion
```
Not sure we need these for this test?
##########
airflow/serialization/serialized_objects.py:
##########
@@ -1048,14 +1058,11 @@ def serialize_dag(cls, dag: DAG) -> dict:
del serialized_dag["timetable"]
serialized_dag["tasks"] = [cls._serialize(task) for _, task in
dag.task_dict.items()]
- dag_deps = [
- t.__dict__
- for task in dag.task_dict.values()
- for t in SerializedBaseOperator.detect_dependencies(task)
- if t is not None
- ]
-
- serialized_dag["dag_dependencies"] = dag_deps
+ dag_deps = {
+ d for t in dag.task_dict.values() for d in
SerializedBaseOperator.detect_dependencies(t)
Review Comment:
```suggestion
dep for task in dag.task_dict.values() for dep in
SerializedBaseOperator.detect_dependencies(task)
```
Might be worth using better variable names here.
##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
}
]
+ @conf_vars(
+ {
+ (
+ 'scheduler',
+ 'dependency_detector',
+ ):
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+ }
+ )
+ def test_custom_dep_detector(self):
+ """
+ Prior to deprecation of custom dependency detector, the return type as
Optional[DagDependency].
+ This class verifies that custom dependency detector classes which
assume that return type will still
+ work until support for them is removed in 3.0.
+
+ TODO: remove in Airflow 3.0
+ """
+ from airflow.operators.empty import EmptyOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ execution_date = datetime(2020, 1, 1)
+ with DAG(dag_id="test", start_date=execution_date) as dag:
+ task1 = ExternalTaskSensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = EmptyOperator(task_id="task2")
+ task1 >> task2
+ CustomDepOperator(task_id='hello', bash_command='hi')
+ dag = SerializedDAG.to_dict(dag)
+ assert sorted(dag['dag']['dag_dependencies'], key=lambda x:
tuple(x.values())) == sorted(
+ [
+ {
+ 'source': 'external_dag_id',
+ 'target': 'test',
+ 'dependency_type': 'sensor',
+ 'dependency_id': 'task1',
+ },
+ {
+ 'source': 'test',
+ 'target': 'nothing',
+ 'dependency_type': 'abc',
+ 'dependency_id': 'hello',
+ },
+ ],
+ key=lambda x: tuple(x.values()),
+ )
+
+ def test_dag_deps_datasets(self):
+ """
+ Check that dag_dependencies node is populated correctly for a DAG with
datasets.
+ """
+ from airflow.operators.empty import EmptyOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ d1 = Dataset('d1')
+ d2 = Dataset('d2')
+ d3 = Dataset('d2')
Review Comment:
Might be worth a different name here, took me a minute to work out the
d3.uri _should_ == "d2".
##########
tests/serialization/test_dag_serialization.py:
##########
@@ -1329,6 +1367,110 @@ class DerivedSensor(ExternalTaskSensor):
}
]
+ @conf_vars(
+ {
+ (
+ 'scheduler',
+ 'dependency_detector',
+ ):
'tests.serialization.test_dag_serialization.CustomDependencyDetector'
+ }
+ )
+ def test_custom_dep_detector(self):
+ """
+ Prior to deprecation of custom dependency detector, the return type as
Optional[DagDependency].
+ This class verifies that custom dependency detector classes which
assume that return type will still
+ work until support for them is removed in 3.0.
+
+ TODO: remove in Airflow 3.0
+ """
+ from airflow.operators.empty import EmptyOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ execution_date = datetime(2020, 1, 1)
+ with DAG(dag_id="test", start_date=execution_date) as dag:
+ task1 = ExternalTaskSensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = EmptyOperator(task_id="task2")
+ task1 >> task2
+ CustomDepOperator(task_id='hello', bash_command='hi')
+ dag = SerializedDAG.to_dict(dag)
+ assert sorted(dag['dag']['dag_dependencies'], key=lambda x:
tuple(x.values())) == sorted(
+ [
+ {
+ 'source': 'external_dag_id',
+ 'target': 'test',
+ 'dependency_type': 'sensor',
+ 'dependency_id': 'task1',
+ },
+ {
+ 'source': 'test',
+ 'target': 'nothing',
+ 'dependency_type': 'abc',
+ 'dependency_id': 'hello',
+ },
+ ],
+ key=lambda x: tuple(x.values()),
+ )
+
+ def test_dag_deps_datasets(self):
+ """
+ Check that dag_dependencies node is populated correctly for a DAG with
datasets.
+ """
+ from airflow.operators.empty import EmptyOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ d1 = Dataset('d1')
+ d2 = Dataset('d2')
+ d3 = Dataset('d2')
+ d4 = Dataset('d4')
+ execution_date = datetime(2020, 1, 1)
+ with DAG(dag_id="test", start_date=execution_date, schedule_on=[d1])
as dag:
+ task1 = ExternalTaskSensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = EmptyOperator(task_id="task2")
+ task1 >> task2
Review Comment:
```suggestion
```
Same here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]