ephraimbuddy commented on code in PR #30271:
URL: https://github.com/apache/airflow/pull/30271#discussion_r1155708621
##########
airflow/models/abstractoperator.py:
##########
@@ -158,33 +158,57 @@ def get_flat_relative_ids(
self,
upstream: bool = False,
found_descendants: set[str] | None = None,
+ setup_only: bool = False,
+ teardown_only: bool = False,
) -> set[str]:
"""Get a flat set of relative IDs, upstream or downstream."""
+ if setup_only and not upstream:
+ raise RuntimeError("Unexpected combination: downstream and setup
only.")
+ if teardown_only and upstream:
+ raise RuntimeError("Unexpected combination: upstream and teardown
only.")
dag = self.get_dag()
if not dag:
return set()
if found_descendants is None:
found_descendants = set()
-
+ # todo: shall we be smart about inferring setups.... i.e. don't clear
it if not connected to
+ # a downstream teardown
+ downstream_teardowns = set()
+ if not teardown_only:
+
downstream_teardowns.update(self.get_flat_relative_ids(teardown_only=True))
task_ids_to_trace = self.get_direct_relative_ids(upstream)
while task_ids_to_trace:
task_ids_to_trace_next: set[str] = set()
for task_id in task_ids_to_trace:
if task_id in found_descendants:
continue
-
task_ids_to_trace_next.update(dag.task_dict[task_id].get_direct_relative_ids(upstream))
- found_descendants.add(task_id)
+ task = dag.task_dict[task_id]
+ if teardown_only and task._is_teardown:
+ found_descendants.add(task_id)
+ task_ids_to_trace_next |=
task.get_direct_relative_ids(upstream)
+ if task._is_setup:
+ is_relevant_setup = not
task.downstream_task_ids.isdisjoint(downstream_teardowns)
+ if setup_only and not is_relevant_setup:
+ continue
+ else:
Review Comment:
It seems like the `else` is not needed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]