kaxil commented on code in PR #67688:
URL: https://github.com/apache/airflow/pull/67688#discussion_r3324795988
##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -625,6 +642,47 @@ def _sweep_projection(self, nodes: list[DAGNode],
projected: list[tuple[int, ...
pending = next_pending
return order
+ def _sort_via_pass_numbering(
Review Comment:
This new branch has no test pinning its emission order against the sweep.
The reverse-chain cases in `test_topological_sort_shape_correctness` route
through here, but `_assert_valid_topological_order` only checks the result is
*a* valid topological sort, not that it matches what `_sweep_projection` emits.
The order-sensitive tests (`test_topological_sort1`/`2`,
`test_topological_nested_groups`) use forward-declared DAGs and route through
the sweep, so nothing pins the "both branches produce identical order"
invariant this PR rests on.
Worth a test that builds a reverse-declared DAG and asserts
`_sort_via_pass_numbering` and `_sweep_projection` return identical orders. I
checked equivalence empirically across ~150k random DAGs and it holds, so this
is a coverage gap rather than a bug, but it's the property most likely to
silently break in a future refactor.
##########
task-sdk/src/airflow/sdk/definitions/taskgroup.py:
##########
@@ -544,49 +544,66 @@ def topological_sort(self) -> list[DAGNode]:
"""
Sort children topologically — a task always comes after its upstream
dependencies.
- Projects each child's per-task upstream IDs onto sibling-level integer
indices once,
- then runs a greedy multi-pass sweep using a bytearray-backed emission
flag. Equivalent
- in emission order to the previous modified-Kahn implementation, but
moves the per-edge
- ``upstream_list`` materialization and ``parent_group`` walks out of
the sweep's inner
- loop so they happen once per call instead of once per outer-loop pass.
+ Projects per-task upstream edges onto sibling-level integer indices,
then dispatches:
+
+ - Forward-declared DAGs (few/no children declared after their
dependents): greedy
+ multi-pass sweep over the projection, O(V + E) for the common case.
+ - Reverse-declared DAGs (many children declared before their
dependents): pass-number
+ traversal, O((V + E) log V), avoids the O(N²) blowup the sweep would
hit.
+
+ Both branches produce the same emission order: level-by-legacy-pass,
ties broken by
+ children insertion order.
"""
children = self.children
if not children:
return []
+
nodes = list(children.values())
+ n = len(nodes)
id_to_idx = {nid: i for i, nid in enumerate(children)}
- projected = [self._project_child_deps(i, c, id_to_idx) for i, c in
enumerate(nodes)]
+
+ projected: list[tuple[int, ...]] = [()] * n
+ nodes_with_back_edge = 0
+ for i, child in enumerate(nodes):
+ deps = self._project_child_deps(i, child, id_to_idx)
+ if deps:
+ projected[i] = deps
+ if any(d > i for d in deps):
+ nodes_with_back_edge += 1
+
+ if nodes_with_back_edge * 2 > n:
Review Comment:
`nodes_with_back_edge * 2 > n` is a ratio over *all* children, so a long
reverse chain padded with independent children silently stays on the O(N²)
sweep, the exact case this PR targets.
Concretely: a reverse chain `r0 >> r1 >> ... >> r_{L-1}` on its own (`n=L`)
has `L-1` back-edge nodes, so `(L-1)*2 > L` is true and it correctly takes the
fast path. Add ~`L` isolated or forward-declared tasks and now `n≈2L` while the
back-edge count is still `L-1`, so the ratio drops below 0.5 and
`nodes_with_back_edge * 2 > n` is false, falling back to the quadratic sweep. A
2000-node DAG with a 1000-long reverse chain plus 1000 independent tasks stays
quadratic.
Worth dispatching on something the dilution can't defeat (an absolute
back-edge count, or reverse-chain depth) in addition to / instead of the ratio,
with a one-line comment on what shape the cutoff discriminates. Mirror the
change in `airflow-core/src/airflow/serialization/definitions/taskgroup.py:244`
and add a padded-reverse-chain dispatch regression.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]