yuqian90 commented on a change in pull request #6633: [AIRFLOW-2279] Clear
tasks across DAGs if marked by ExternalTaskMarker
URL: https://github.com/apache/airflow/pull/6633#discussion_r358223426
##########
File path: airflow/models/dag.py
##########
@@ -950,6 +990,60 @@ def clear(
if only_running:
tis = tis.filter(TI.state == State.RUNNING)
+ if include_subdags:
+ # Recursively find external tasks indicated by ExternalTaskMarker
+ instances = tis.all()
+ for ti in instances:
+ if ti.operator == ExternalTaskMarker.__name__:
Review comment:
Hi @ashb I tried the following:
```
isinstance(ti.task, ExternalTaskMarker)
```
Unfortunately I realized that ti.task is not set if `TaskInstance` is not
created from the constructor `__init__`.
```
E.g. the following raises `AttributeError` in this scope.
(Pdb++) p ti.task
*** AttributeError: 'TaskInstance' object has no attribute 'task'
(Pdb++) p isinstance(ti.task, ExternalTaskMarker)
*** AttributeError: 'TaskInstance' object has no attribute 'task'
This works:
(Pdb++) p ti.operator
'ExternalTaskMarker'
```
If `isinstance` is preferred, one other way do it is something like this:
```
task = self.get_task(ti.task_id)
if isinstance(task, ExternalTaskMarker):
if recursion_depth == 0:
# Maximum recursion depth allowed is the recursion_depth of the first
# ExternalTaskMarker in the tasks to be cleared.
max_recursion_depth = task.recursion_depth
...
```
This always gets the task object from the dag, even when it's not needed. So
it's likely slightly slower than checking the value of `ti.operator`. We should
only do it if there's a strong reason.
@ashb @kaxil what do you think?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services