huashi-st commented on code in PR #64181:
URL: https://github.com/apache/airflow/pull/64181#discussion_r2990470991
##########
airflow-core/src/airflow/ti_deps/deps/trigger_rule_dep.py:
##########
@@ -619,6 +619,60 @@ def _evaluate_direct_relatives() -> Iterator[TIDepStatus]:
reason=f"No strategy to evaluate trigger rule
'{trigger_rule_str}'."
)
+ def _evaluate_teardown_scope() -> Iterator[TIDepStatus]:
+ """Ensure all tasks between setup(s) and this teardown have
completed."""
+ if not task.dag:
+ return
+
+ setup_task_ids = {t.task_id for t in task.upstream_list if
t.is_setup}
+ if not setup_task_ids:
+ return
+
+ all_upstream_ids = task.get_flat_relative_ids(upstream=True)
+ indirect_upstream_ids = all_upstream_ids - task.upstream_task_ids
+
+ if not indirect_upstream_ids:
+ return
+
+ in_scope_ids = set()
+ for setup_id in setup_task_ids:
+ setup_obj = task.dag.get_task(setup_id)
+ in_scope_ids.update(indirect_upstream_ids &
setup_obj.get_flat_relative_ids(upstream=False))
+
+ if not in_scope_ids:
+ return
+
+ in_scope_tasks = {tid: task.dag.get_task(tid) for tid in
in_scope_ids}
+
+ finished_upstream_tis = (
+ x
+ for x in
dep_context.ensure_finished_tis(ti.get_dagrun(session), session)
+ if _is_relevant_upstream(upstream=x, relevant_ids=in_scope_ids)
+ )
+ done = sum(1 for _ in finished_upstream_tis)
+
+ if not any(t.get_needs_expansion() for t in
in_scope_tasks.values()):
+ expected = len(in_scope_tasks)
+ else:
+ task_id_counts = session.execute(
+ select(TaskInstance.task_id,
func.count(TaskInstance.task_id))
+ .where(TaskInstance.dag_id == ti.dag_id,
TaskInstance.run_id == ti.run_id)
+
.where(or_(*_iter_upstream_conditions(relevant_tasks=in_scope_tasks)))
+ .group_by(TaskInstance.task_id)
+ ).all()
+ expected = sum(count for _, count in task_id_counts)
Review Comment:
very true from a best practices perspective but I chose to do so because:
1. Hard to translate `_is_relevant_upstream` into SQL.
2. [ensure_finished_tis is
cached](https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/ti_deps/dep_context.py#L95)
and
[_evaluate_direct_relatives](https://github.com/apache/airflow/pull/64181/changes#diff-d1137b22d5a81bf2b102846f18e06b077dbe7883844a11eaf17dbc5593b3707eR686)
is always called beforehand to populate it. Performance should be fine.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]