yuqian90 commented on a change in pull request #6633: [AIRFLOW-2279] Clear
tasks across DAGs if marked by ExternalTaskMarker
URL: https://github.com/apache/airflow/pull/6633#discussion_r358144296
##########
File path: airflow/sensors/external_task_sensor.py
##########
@@ -161,3 +163,56 @@ def poke(self, context, session=None):
session.commit()
return count == len(dttm_filter)
+
+
+class ExternalTaskMarker(BaseOperator):
+ """
+ Use this operator to indicate that a task on a different DAG depends on
this task.
+ When this task is cleared with "Recursive" selected, Airflow will clear
the task on
+ the other DAG and its downstream tasks recursively. Transitive
dependencies is followed
+ until the recursion_depth is reached.
+ """
+ template_fields = ['external_dag_id', 'external_task_id', 'execution_date']
+ ui_color = '#19647e'
+
+ @apply_defaults
+ def __init__(self,
+ external_dag_id,
+ external_task_id,
+ execution_date: Optional[Union[str, datetime.datetime]] = "{{
execution_date.isoformat() }}",
+ recursion_depth: int = 10,
+ *args,
+ **kwargs):
+ """
+ :param external_dag_id: The dag_id that contains the task you want to
wait for
+ :type external_dag_id: str
+ :param external_task_id: The task_id that contains the task you want
to wait for.
+ :type external_task_id: str
+ :param execution_date: The execution_date of the task that you want to
wait for.
+ :type execution_date: str or datetime.datetime
+ :param recursion_depth: The maximum level of transitive dependencies
allowed. Default is 10.
+ This is mostly used for preventing cyclic dependencies. It is fine
to increase
+ this number if necessary. However, too many levels of transitive
dependencies will make
+ it slower to clear tasks in the web UI.
+ """
+ super().__init__(*args, **kwargs)
+
+ self.external_dag_id = external_dag_id
+ self.external_task_id = external_task_id
+ if isinstance(execution_date, datetime.datetime):
+ self.execution_date = execution_date.isoformat()
+ elif isinstance(execution_date, str):
+ self.execution_date = execution_date
+ else:
+ raise TypeError('Expected str or datetime.datetime type for
execution_date. Got {}'
+ .format(type(execution_date)))
+ if recursion_depth <= 0:
+ raise ValueError("recursion_depth should be a positive integer")
+ self.recursion_depth = recursion_depth
+
+ def execute(self, context):
+ """
+ Since the only purpose of this operator is to indicate a dependency on
an external DAG, this
+ method is a no-op.
Review comment:
Thanks. That's a good point. I'll change this to a subclass of
`DummyOperator`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services