mr-runner: Graph.getSteps() to return with topological order, this fixes few 
CombineTests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e330d360
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e330d360
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e330d360

Branch: refs/heads/mr-runner
Commit: e330d360b4c90899b0ea94060955d519fd190a95
Parents: 989d7d8
Author: Pei He <[email protected]>
Authored: Fri Sep 1 13:06:49 2017 +0800
Committer: Pei He <[email protected]>
Committed: Fri Sep 1 17:13:53 2017 +0800

----------------------------------------------------------------------
 .../runners/mapreduce/translation/Graph.java    | 34 ++++++++++++--------
 1 file changed, 21 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e330d360/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
index 66e573f..144f9a4 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/Graph.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
 import com.google.common.collect.FluentIterable;
@@ -25,6 +27,7 @@ import com.google.common.collect.Sets;
 import com.google.common.graph.ElementOrder;
 import com.google.common.graph.GraphBuilder;
 import com.google.common.graph.MutableGraph;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -91,24 +94,29 @@ public class Graph<StepT extends Graph.AbstractStep, TagT 
extends Graph.Abstract
   }
 
   public List<StepT> getSteps() {
-    return castToStepList(FluentIterable.from(graph.nodes())
+    List<Vertex> ret = new ArrayList<>();
+
+    Set<Vertex> pendingNodes = Sets.newHashSet(graph.nodes());
+    while (!pendingNodes.isEmpty()) {
+      List<Vertex> readyNodes = new ArrayList<>();
+      for (Vertex v : pendingNodes) {
+        if (Sets.intersection(pendingNodes, graph.predecessors(v)).isEmpty()) {
+          readyNodes.add(v);
+        }
+      }
+      checkState(
+          !readyNodes.isEmpty(),
+          "No ready nodes found, there are cycles in graph: " + graph);
+      ret.addAll(readyNodes);
+      pendingNodes.removeAll(readyNodes);
+    }
+    return castToStepList(FluentIterable.from(ret)
         .filter(new Predicate<Vertex>() {
           @Override
           public boolean apply(Vertex input) {
             return input instanceof AbstractStep;
           }}))
-        .toSortedList(new Comparator<StepT>() {
-          @Override
-          public int compare(StepT left, StepT right) {
-            if (left.equals(right)) {
-              return 0;
-            } else if (com.google.common.graph.Graphs.reachableNodes(graph, 
left).contains(right)) {
-              return -1;
-            } else {
-              return 1;
-            }
-          }
-        });
+        .toList();
   }
 
   public List<StepT> getStartSteps() {

Reply via email to