mr-runner: fix the bug that steps are attached multiple times in diamond shaped 
DAG.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e7062cd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e7062cd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e7062cd

Branch: refs/heads/mr-runner
Commit: 4e7062cd9de6ae3f0616033823fd995eb10a3744
Parents: 6c2390a
Author: Pei He <p...@apache.org>
Authored: Wed Aug 30 19:16:06 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Thu Aug 31 14:13:50 2017 +0800

----------------------------------------------------------------------
 .../runners/mapreduce/translation/JobPrototype.java   | 14 ++++++++++----
 1 file changed, 10 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4e7062cd/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
index 39487fd..a0c6626 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java
@@ -24,10 +24,12 @@ import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -139,7 +141,7 @@ public class JobPrototype {
 
       // Setup BeamReducer
       Graphs.Step reducerStartStep = gabwStep;
-      chainOperations(reducerStartStep, fusedStep);
+      chainOperations(reducerStartStep, fusedStep, 
Sets.<Graphs.Step>newHashSet());
       conf.set(
           BeamReducer.BEAM_REDUCER_KV_CODER,
           Base64.encodeBase64String(SerializableUtils.serializeToByteArray(
@@ -152,7 +154,7 @@ public class JobPrototype {
     }
 
     // Setup DoFns in BeamMapper.
-    chainOperations(startStep, fusedStep);
+    chainOperations(startStep, fusedStep, Sets.<Graphs.Step>newHashSet());
 
     job.setMapOutputKeyClass(BytesWritable.class);
     job.setMapOutputValueClass(byte[].class);
@@ -177,7 +179,8 @@ public class JobPrototype {
     return job;
   }
 
-  private void chainOperations(Graphs.Step current, Graphs.FusedStep 
fusedStep) {
+  private void chainOperations(
+      Graphs.Step current, Graphs.FusedStep fusedStep, Set<Graphs.Step> 
visited) {
     Operation<?> operation = current.getOperation();
     List<Graphs.Tag> outputTags = fusedStep.getOutputTags(current);
     for (Graphs.Tag outTag : outputTags) {
@@ -185,9 +188,12 @@ public class JobPrototype {
         operation.attachConsumer(outTag.getTupleTag(), 
consumer.getOperation());
       }
     }
+    visited.add(current);
     for (Graphs.Tag outTag : outputTags) {
       for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) {
-        chainOperations(consumer, fusedStep);
+        if (!visited.contains(consumer)) {
+          chainOperations(consumer, fusedStep, visited);
+        }
       }
     }
   }

Reply via email to