dstandish commented on a change in pull request #19683:
URL: https://github.com/apache/airflow/pull/19683#discussion_r763222713
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,66 @@ def test_deps_sorted(self):
'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
]
+ def test_derived_dag_deps(self):
+ """
+ Tests DAG dependency detection, including derived classes
+ """
+ from airflow.operators.dummy import DummyOperator
+ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ class DerivedSensor(ExternalTaskSensor):
+ template_fields = ['external_dag_id', 'run_id']
+
+ def __init__(self, *, external_dag_id: str, run_id: str = None,
**kwargs):
+ super().__init__(
+ external_dag_id=external_dag_id, execution_delta=None,
execution_date_fn=None, **kwargs
+ )
+ self.run_id = run_id
Review comment:
```suggestion
class DerivedSensor(ExternalTaskSensor):
pass
```
would this not accoplish the same thing? is there a reason we need to add
the other stuff for this test?
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,66 @@ def test_deps_sorted(self):
'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
]
+ def test_derived_dag_deps(self):
+ """
+ Tests DAG dependency detection, including derived classes
+ """
+ from airflow.operators.dummy import DummyOperator
+ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ class DerivedSensor(ExternalTaskSensor):
+ template_fields = ['external_dag_id', 'run_id']
+
+ def __init__(self, *, external_dag_id: str, run_id: str = None,
**kwargs):
+ super().__init__(
+ external_dag_id=external_dag_id, execution_delta=None,
execution_date_fn=None, **kwargs
+ )
+ self.run_id = run_id
+
+ execution_date = datetime(2020, 1, 1)
+ for class_ in [ExternalTaskSensor, DerivedSensor]:
+ with DAG(dag_id="test_derived_dag_deps_sensor",
start_date=execution_date) as dag:
+ task1 = class_(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = DummyOperator(task_id="task2")
+ task1 >> task2
+
+ dag = SerializedDAG.to_dict(dag)
+ assert dag['dag']['dag_dependencies'] == [
+ {
+ 'source': 'external_dag_id',
+ 'target': 'test_derived_dag_deps_sensor',
+ 'dependency_type': 'sensor',
+ 'dependency_id': 'task1',
+ }
+ ]
+
+ class DerivedOperator(TriggerDagRunOperator):
Review comment:
could you please split this test into two tests:
`test_derived_dag_deps_sensor` and `test_derived_dag_deps_operator`?
there's not really any shared logic or interaction, so separating would help
readability
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]