dstandish commented on a change in pull request #19683:
URL: https://github.com/apache/airflow/pull/19683#discussion_r758624943
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
]
+ def test_derived_dag_deps(self):
+ """
+ Tests DAG dependency detection, including derived classes
+ """
+ from airflow.operators.dummy import DummyOperator
+ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ class DerivedSensor(ExternalTaskSensor):
+ template_fields = ['external_dag_id', 'run_id']
+
+ def __init__(self, *, external_dag_id: str, run_id: str = None,
**kwargs):
+ super().__init__(
+ external_dag_id=external_dag_id, execution_delta=None,
execution_date_fn=None, **kwargs
+ )
+ self.run_id = run_id
+
+ execution_date = datetime(2020, 1, 1)
+ for Sensor in [ExternalTaskSensor, DerivedSensor]:
+ with DAG(dag_id="test_derived_dag_deps_sensor",
start_date=execution_date) as dag:
+ task1 = Sensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = DummyOperator(task_id="task2")
+ task1 >> task2
+
+ dag = SerializedDAG.to_dict(dag)
+ assert dag['dag']['dag_dependencies'] == [
+ {
+ 'source': 'external_dag_id',
+ 'target': 'test_derived_dag_deps_sensor',
+ 'dependency_type': 'sensor',
+ 'dependency_id': 'task1',
+ }
+ ]
+
+ class DerivedTrigger(TriggerDagRunOperator):
Review comment:
please call this `DerivedOperator` instead of `DerivedTrigger`
`Trigger` is a _thing_ in airflow so best not to use it here (could be
confusing, and produce false positives in search)
```suggestion
class DerivedOperator(TriggerDagRunOperator):
```
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
]
+ def test_derived_dag_deps(self):
+ """
+ Tests DAG dependency detection, including derived classes
+ """
+ from airflow.operators.dummy import DummyOperator
+ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ class DerivedSensor(ExternalTaskSensor):
+ template_fields = ['external_dag_id', 'run_id']
+
+ def __init__(self, *, external_dag_id: str, run_id: str = None,
**kwargs):
+ super().__init__(
+ external_dag_id=external_dag_id, execution_delta=None,
execution_date_fn=None, **kwargs
+ )
+ self.run_id = run_id
+
+ execution_date = datetime(2020, 1, 1)
+ for Sensor in [ExternalTaskSensor, DerivedSensor]:
Review comment:
```suggestion
for class_ in [ExternalTaskSensor, DerivedSensor]:
```
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
]
+ def test_derived_dag_deps(self):
+ """
+ Tests DAG dependency detection, including derived classes
+ """
+ from airflow.operators.dummy import DummyOperator
+ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ class DerivedSensor(ExternalTaskSensor):
+ template_fields = ['external_dag_id', 'run_id']
+
+ def __init__(self, *, external_dag_id: str, run_id: str = None,
**kwargs):
+ super().__init__(
+ external_dag_id=external_dag_id, execution_delta=None,
execution_date_fn=None, **kwargs
+ )
+ self.run_id = run_id
+
+ execution_date = datetime(2020, 1, 1)
+ for Sensor in [ExternalTaskSensor, DerivedSensor]:
+ with DAG(dag_id="test_derived_dag_deps_sensor",
start_date=execution_date) as dag:
+ task1 = Sensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = DummyOperator(task_id="task2")
+ task1 >> task2
+
+ dag = SerializedDAG.to_dict(dag)
+ assert dag['dag']['dag_dependencies'] == [
+ {
+ 'source': 'external_dag_id',
+ 'target': 'test_derived_dag_deps_sensor',
+ 'dependency_type': 'sensor',
+ 'dependency_id': 'task1',
+ }
+ ]
+
+ class DerivedTrigger(TriggerDagRunOperator):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
+
+ for Trigger in [TriggerDagRunOperator, DerivedTrigger]:
Review comment:
```suggestion
for class_ in [TriggerDagRunOperator, DerivedTrigger]:
```
##########
File path: airflow/serialization/serialized_objects.py
##########
@@ -444,14 +446,14 @@ class DependencyDetector:
@staticmethod
def detect_task_dependencies(task: BaseOperator) ->
Optional['DagDependency']:
"""Detects dependencies caused by tasks"""
- if task.task_type == "TriggerDagRunOperator":
+ if isinstance(task, TriggerDagRunOperator):
Review comment:
@kaxil do you think that perhaps there should be an attribute on task
sensor (like a class attribute that is set on it, and which could also be set
in other implementations) that we look at (instead of the class itself) so that
there could be other implementations of task-sensor-like operators which do not
inherit from external task sensor but also have dag dependencies?
so that it needn't necessarily be a subclass. maybe it's a bit too magical,
and just thinking aloud, but perhaps we could look at the arguments of the
operator to infer whether DagDependency should be created?
##########
File path: tests/serialization/test_dag_serialization.py
##########
@@ -1233,6 +1233,67 @@ def test_deps_sorted(self):
'airflow.ti_deps.deps.trigger_rule_dep.TriggerRuleDep',
]
+ def test_derived_dag_deps(self):
+ """
+ Tests DAG dependency detection, including derived classes
+ """
+ from airflow.operators.dummy import DummyOperator
+ from airflow.operators.trigger_dagrun import TriggerDagRunOperator
+ from airflow.sensors.external_task import ExternalTaskSensor
+
+ class DerivedSensor(ExternalTaskSensor):
+ template_fields = ['external_dag_id', 'run_id']
+
+ def __init__(self, *, external_dag_id: str, run_id: str = None,
**kwargs):
+ super().__init__(
+ external_dag_id=external_dag_id, execution_delta=None,
execution_date_fn=None, **kwargs
+ )
+ self.run_id = run_id
+
+ execution_date = datetime(2020, 1, 1)
+ for Sensor in [ExternalTaskSensor, DerivedSensor]:
+ with DAG(dag_id="test_derived_dag_deps_sensor",
start_date=execution_date) as dag:
+ task1 = Sensor(
+ task_id="task1",
+ external_dag_id="external_dag_id",
+ mode="reschedule",
+ )
+ task2 = DummyOperator(task_id="task2")
+ task1 >> task2
+
+ dag = SerializedDAG.to_dict(dag)
+ assert dag['dag']['dag_dependencies'] == [
+ {
+ 'source': 'external_dag_id',
+ 'target': 'test_derived_dag_deps_sensor',
+ 'dependency_type': 'sensor',
+ 'dependency_id': 'task1',
+ }
+ ]
+
+ class DerivedTrigger(TriggerDagRunOperator):
+ def __init__(self, **kwargs):
+ super().__init__(**kwargs)
Review comment:
you can omit `__init__` when you do not change it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]