uranusjr commented on code in PR #30271:
URL: https://github.com/apache/airflow/pull/30271#discussion_r1233693640
##########
airflow/models/abstractoperator.py:
##########
@@ -188,6 +188,44 @@ def get_flat_relatives(self, upstream: bool = False) ->
Collection[Operator]:
return set()
return [dag.task_dict[task_id] for task_id in
self.get_flat_relative_ids(upstream=upstream)]
+ def get_upstreams_follow_setups(self) -> Iterable[Operator]:
+ """All upstreams and, for each upstream setup, its respective
teardowns."""
+ for task in self.get_flat_relatives(upstream=True):
+ yield task
+ if task.is_setup:
+ for t in task.downstream_list:
+ if t.is_teardown and not t == self:
+ yield t
+
+ def get_upstreams_only_setups_and_teardowns(self) -> Iterable[Operator]:
+ """
+ Only *relevant* upstream setups and their teardowns.
+
+ This method is meant to be used when we are clearing the task
(non-upstream) and we need
+ to add in the *relevant* setups and their teardowns.
+
+ Relevant in this case means, the setup has a teardown that is
downstream of ``self``,
+ *or*, the setup has *no teardowns*.
+ """
+ downstream_teardown_ids = {
+ x.task_id for x in self.get_flat_relatives(upstream=False) if
x.is_teardown
+ }
+ for task in self.get_flat_relatives(upstream=True):
+ if not task.is_setup:
+ continue
+ is_relevant = False
+ # setup has teardown downstream of `self`
+ if not
task.downstream_task_ids.isdisjoint(downstream_teardown_ids):
+ is_relevant = True
+ # setup has no teardowns
+ elif not [x for x in task.downstream_list if x.is_teardown]:
Review Comment:
```suggestion
elif not any(x.is_teardown for x in task.downstream_list):
```
(Not sure if I got this right, the point is to use `any`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]