shahar1 commented on code in PR #67688:
URL: https://github.com/apache/airflow/pull/67688#discussion_r3325594794
##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -544,49 +544,66 @@ def topological_sort(self) -> list[DAGNode]:
"""
Sort children topologically — a task always comes after its upstream
dependencies.
- Projects each child's per-task upstream IDs onto sibling-level integer
indices once,
- then runs a greedy multi-pass sweep using a bytearray-backed emission
flag. Equivalent
- in emission order to the previous modified-Kahn implementation, but
moves the per-edge
- ``upstream_list`` materialization and ``parent_group`` walks out of
the sweep's inner
- loop so they happen once per call instead of once per outer-loop pass.
+ Projects per-task upstream edges onto sibling-level integer indices,
then dispatches:
+
+ - Forward-declared DAGs (few/no children declared after their
dependents): greedy
+ multi-pass sweep over the projection, O(V + E) for the common case.
+ - Reverse-declared DAGs (many children declared before their
dependents): pass-number
+ traversal, O((V + E) log V), avoids the O(N²) blowup the sweep would
hit.
+
+ Both branches produce the same emission order: level-by-legacy-pass,
ties broken by
+ children insertion order.
"""
children = self.children
if not children:
return []
+
nodes = list(children.values())
+ n = len(nodes)
id_to_idx = {nid: i for i, nid in enumerate(children)}
- projected = [self._project_child_deps(i, c, id_to_idx) for i, c in
enumerate(nodes)]
+
+ projected: list[tuple[int, ...]] = [()] * n
+ nodes_with_back_edge = 0
+ for i, child in enumerate(nodes):
+ deps = self._project_child_deps(i, child, id_to_idx)
+ if deps:
+ projected[i] = deps
+ if any(d > i for d in deps):
+ nodes_with_back_edge += 1
+
+ if nodes_with_back_edge * 2 > n:
Review Comment:
Added an absolute back-edge cutoff (>= 32) and padded reverse-chain
regressions; the diluted review case now takes the fast path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]