mr-runner: fix the bug that steps are attached multiple times in diamond shaped DAG.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4e7062cd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4e7062cd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4e7062cd Branch: refs/heads/mr-runner Commit: 4e7062cd9de6ae3f0616033823fd995eb10a3744 Parents: 6c2390a Author: Pei He <p...@apache.org> Authored: Wed Aug 30 19:16:06 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Thu Aug 31 14:13:50 2017 +0800 ---------------------------------------------------------------------- .../runners/mapreduce/translation/JobPrototype.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4e7062cd/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java index 39487fd..a0c6626 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/JobPrototype.java @@ -24,10 +24,12 @@ import com.google.common.base.Function; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Set; import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; @@ -139,7 +141,7 @@ public class JobPrototype { // Setup BeamReducer Graphs.Step reducerStartStep = gabwStep; - chainOperations(reducerStartStep, fusedStep); + chainOperations(reducerStartStep, fusedStep, Sets.<Graphs.Step>newHashSet()); conf.set( BeamReducer.BEAM_REDUCER_KV_CODER, Base64.encodeBase64String(SerializableUtils.serializeToByteArray( @@ -152,7 +154,7 @@ public class JobPrototype { } // Setup DoFns in BeamMapper. - chainOperations(startStep, fusedStep); + chainOperations(startStep, fusedStep, Sets.<Graphs.Step>newHashSet()); job.setMapOutputKeyClass(BytesWritable.class); job.setMapOutputValueClass(byte[].class); @@ -177,7 +179,8 @@ public class JobPrototype { return job; } - private void chainOperations(Graphs.Step current, Graphs.FusedStep fusedStep) { + private void chainOperations( + Graphs.Step current, Graphs.FusedStep fusedStep, Set<Graphs.Step> visited) { Operation<?> operation = current.getOperation(); List<Graphs.Tag> outputTags = fusedStep.getOutputTags(current); for (Graphs.Tag outTag : outputTags) { @@ -185,9 +188,12 @@ public class JobPrototype { operation.attachConsumer(outTag.getTupleTag(), consumer.getOperation()); } } + visited.add(current); for (Graphs.Tag outTag : outputTags) { for (Graphs.Step consumer : fusedStep.getConsumers(outTag)) { - chainOperations(consumer, fusedStep); + if (!visited.contains(consumer)) { + chainOperations(consumer, fusedStep, visited); + } } } }