mr-runner: introduces duplicateFactor in FlattenOperation, this fixes testFlattenInputMultipleCopies().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5248ce42 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5248ce42 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5248ce42 Branch: refs/heads/mr-runner Commit: 5248ce42f3ab31e8952f6604ef804b342c57d962 Parents: 99bffd2 Author: Pei He <[email protected]> Authored: Fri Sep 1 15:10:48 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 1 17:13:53 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/translation/FlattenOperation.java | 9 ++- .../translation/FlattenTranslator.java | 69 +++++++++++++++++--- 2 files changed, 67 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5248ce42/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java index 191b346..3c5ac95 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java @@ -24,14 +24,19 @@ import org.apache.beam.sdk.util.WindowedValue; */ public class FlattenOperation<T> extends Operation<T> { - public FlattenOperation() { + private final int duplicateFactor; + + public FlattenOperation(int duplicateFactor) { super(1); + this.duplicateFactor = duplicateFactor; } @Override public void process(WindowedValue elem) { for (OutputReceiver receiver : getOutputReceivers()) { - receiver.process(elem); + for (int i = 0; i < duplicateFactor; ++i) { + receiver.process(elem); + } } } } http://git-wip-us.apache.org/repos/asf/beam/blob/5248ce42/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java index b869936..817f2bf 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java @@ -17,15 +17,22 @@ */ package org.apache.beam.runners.mapreduce.translation; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.values.TupleTag; /** * Translates a {@link Flatten} to a {@link FlattenOperation}. @@ -34,18 +41,62 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC @Override public void translateNode(Flatten.PCollections<T> transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - List<Graphs.Tag> inputTags = userGraphContext.getInputTags(); - Operation<?> operation; - if (inputTags.isEmpty()) { + + Map<Graphs.Tag, Integer> inputTagToCount = Maps.newHashMap(); + boolean containsDuplicates = false; + for (Graphs.Tag inputTag : userGraphContext.getInputTags()) { + Integer count = inputTagToCount.get(inputTag); + if (count == null) { + count = Integer.valueOf(0); + } + inputTagToCount.put(inputTag, ++count); + if (count > 1) { + containsDuplicates = true; + } + } + + if (inputTagToCount.isEmpty()) { // Create a empty source - operation = new SourceReadOperation(new EmptySource(), userGraphContext.getOnlyOutputTag()); + Operation<?> operation = + new SourceReadOperation(new EmptySource(), userGraphContext.getOnlyOutputTag()); + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), operation), + userGraphContext.getInputTags(), + userGraphContext.getOutputTags()); + } else if (!containsDuplicates) { + Operation<?> operation = new FlattenOperation(1); + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), operation), + userGraphContext.getInputTags(), + userGraphContext.getOutputTags()); } else { - operation = new FlattenOperation(); + List<Graphs.Tag> intermediateTags = new ArrayList<>(); + for (Map.Entry<Graphs.Tag, Integer> entry : inputTagToCount.entrySet()) { + Integer dupFactor = entry.getValue(); + Graphs.Tag inTag = entry.getKey(); + checkState( + dupFactor > 0, "dupFactor should be positive, but was: " + dupFactor); + if (dupFactor == 1) { + intermediateTags.add(inTag); + } else { + String dupStepName = userGraphContext.getStepName() + "/Dup-" + dupFactor; + Graphs.Tag outTag = Graphs.Tag.of( + dupStepName + ".out", + new TupleTag<T>(), + inTag.getCoder(), + inTag.getWindowingStrategy()); + context.addInitStep( + Graphs.Step.of(dupStepName, new FlattenOperation(dupFactor)), + ImmutableList.of(inTag), + ImmutableList.of(outTag)); + intermediateTags.add(outTag); + } + } + context.addInitStep( + Graphs.Step.of(userGraphContext.getStepName(), new FlattenOperation(1)), + intermediateTags, + userGraphContext.getOutputTags()); } - context.addInitStep( - Graphs.Step.of(userGraphContext.getStepName(), operation), - inputTags, - userGraphContext.getOutputTags()); } private static class EmptySource extends BoundedSource<Void> {
