kaxil commented on code in PR #67688:
URL: https://github.com/apache/airflow/pull/67688#discussion_r3324795988


##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -625,6 +642,47 @@ def _sweep_projection(self, nodes: list[DAGNode], 
projected: list[tuple[int, ...
             pending = next_pending
         return order
 
+    def _sort_via_pass_numbering(

Review Comment:
   This new branch has no test pinning its emission order against the sweep. 
The reverse-chain cases in `test_topological_sort_shape_correctness` route 
through here, but `_assert_valid_topological_order` only checks the result is 
*a* valid topological sort, not that it matches what `_sweep_projection` emits. 
The order-sensitive tests (`test_topological_sort1`/`2`, 
`test_topological_nested_groups`) use forward-declared DAGs and route through 
the sweep, so nothing pins the "both branches produce identical order" 
invariant this PR rests on.
   
   Worth a test that builds a reverse-declared DAG and asserts 
`_sort_via_pass_numbering` and `_sweep_projection` return identical orders. I 
checked equivalence empirically across ~150k random DAGs and it holds, so this 
is a coverage gap rather than a bug, but it's the property most likely to 
silently break in a future refactor.



##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -544,49 +544,66 @@ def topological_sort(self) -> list[DAGNode]:
         """
         Sort children topologically — a task always comes after its upstream 
dependencies.
 
-        Projects each child's per-task upstream IDs onto sibling-level integer 
indices once,
-        then runs a greedy multi-pass sweep using a bytearray-backed emission 
flag. Equivalent
-        in emission order to the previous modified-Kahn implementation, but 
moves the per-edge
-        ``upstream_list`` materialization and ``parent_group`` walks out of 
the sweep's inner
-        loop so they happen once per call instead of once per outer-loop pass.
+        Projects per-task upstream edges onto sibling-level integer indices, 
then dispatches:
+
+        - Forward-declared DAGs (few/no children declared after their 
dependents): greedy
+          multi-pass sweep over the projection, O(V + E) for the common case.
+        - Reverse-declared DAGs (many children declared before their 
dependents): pass-number
+          traversal, O((V + E) log V), avoids the O(N²) blowup the sweep would 
hit.
+
+        Both branches produce the same emission order: level-by-legacy-pass, 
ties broken by
+        children insertion order.
         """
         children = self.children
         if not children:
             return []
+
         nodes = list(children.values())
+        n = len(nodes)
         id_to_idx = {nid: i for i, nid in enumerate(children)}
-        projected = [self._project_child_deps(i, c, id_to_idx) for i, c in 
enumerate(nodes)]
+
+        projected: list[tuple[int, ...]] = [()] * n
+        nodes_with_back_edge = 0
+        for i, child in enumerate(nodes):
+            deps = self._project_child_deps(i, child, id_to_idx)
+            if deps:
+                projected[i] = deps
+                if any(d > i for d in deps):
+                    nodes_with_back_edge += 1
+
+        if nodes_with_back_edge * 2 > n:

Review Comment:
   `nodes_with_back_edge * 2 > n` is a ratio over *all* children, so a long 
reverse chain padded with independent children silently stays on the O(N²) 
sweep, the exact case this PR targets.
   
   Concretely: a reverse chain `r0 >> r1 >> ... >> r_{L-1}` on its own (`n=L`) 
has `L-1` back-edge nodes, so `(L-1)*2 > L` is true and it correctly takes the 
fast path. Add ~`L` isolated or forward-declared tasks and now `n≈2L` while the 
back-edge count is still `L-1`, so the ratio drops below 0.5 and 
`nodes_with_back_edge * 2 > n` is false, falling back to the quadratic sweep. A 
2000-node DAG with a 1000-long reverse chain plus 1000 independent tasks stays 
quadratic.
   
   Worth dispatching on something the dilution can't defeat (an absolute 
back-edge count, or reverse-chain depth) in addition to / instead of the ratio, 
with a one-line comment on what shape the cutoff discriminates. Mirror the 
change in `airflow-core/src/airflow/serialization/definitions/taskgroup.py:244` 
and add a padded-reverse-chain dispatch regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to