mr-runner: Graph.getSteps() to return with topological order, this fixes few CombineTests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e330d360 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e330d360 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e330d360 Branch: refs/heads/mr-runner Commit: e330d360b4c90899b0ea94060955d519fd190a95 Parents: 989d7d8 Author: Pei He <[email protected]> Authored: Fri Sep 1 13:06:49 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 1 17:13:53 2017 +0800 ---------------------------------------------------------------------- .../runners/mapreduce/translation/Graph.java | 34 ++++++++++++-------- 1 file changed, 21 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e330d360/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java index 66e573f..144f9a4 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.mapreduce.translation; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; @@ -25,6 +27,7 @@ import com.google.common.collect.Sets; import com.google.common.graph.ElementOrder; import com.google.common.graph.GraphBuilder; import com.google.common.graph.MutableGraph; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -91,24 +94,29 @@ public class Graph<StepT extends Graph.AbstractStep, TagT extends Graph.Abstract } public List<StepT> getSteps() { - return castToStepList(FluentIterable.from(graph.nodes()) + List<Vertex> ret = new ArrayList<>(); + + Set<Vertex> pendingNodes = Sets.newHashSet(graph.nodes()); + while (!pendingNodes.isEmpty()) { + List<Vertex> readyNodes = new ArrayList<>(); + for (Vertex v : pendingNodes) { + if (Sets.intersection(pendingNodes, graph.predecessors(v)).isEmpty()) { + readyNodes.add(v); + } + } + checkState( + !readyNodes.isEmpty(), + "No ready nodes found, there are cycles in graph: " + graph); + ret.addAll(readyNodes); + pendingNodes.removeAll(readyNodes); + } + return castToStepList(FluentIterable.from(ret) .filter(new Predicate<Vertex>() { @Override public boolean apply(Vertex input) { return input instanceof AbstractStep; }})) - .toSortedList(new Comparator<StepT>() { - @Override - public int compare(StepT left, StepT right) { - if (left.equals(right)) { - return 0; - } else if (com.google.common.graph.Graphs.reachableNodes(graph, left).contains(right)) { - return -1; - } else { - return 1; - } - } - }); + .toList(); } public List<StepT> getStartSteps() {
