mr-runner: introduces duplicateFactor in FlattenOperation, this fixes 
testFlattenInputMultipleCopies().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5248ce42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5248ce42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5248ce42

Branch: refs/heads/mr-runner
Commit: 5248ce42f3ab31e8952f6604ef804b342c57d962
Parents: 99bffd2
Author: Pei He <[email protected]>
Authored: Fri Sep 1 15:10:48 2017 +0800
Committer: Pei He <[email protected]>
Committed: Fri Sep 1 17:13:53 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/translation/FlattenOperation.java |  9 ++-
 .../translation/FlattenTranslator.java          | 69 +++++++++++++++++---
 2 files changed, 67 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5248ce42/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
index 191b346..3c5ac95 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenOperation.java
@@ -24,14 +24,19 @@ import org.apache.beam.sdk.util.WindowedValue;
  */
 public class FlattenOperation<T> extends Operation<T> {
 
-  public FlattenOperation() {
+  private final int duplicateFactor;
+
+  public FlattenOperation(int duplicateFactor) {
     super(1);
+    this.duplicateFactor = duplicateFactor;
   }
 
   @Override
   public void process(WindowedValue elem) {
     for (OutputReceiver receiver : getOutputReceivers()) {
-      receiver.process(elem);
+      for (int i = 0; i < duplicateFactor; ++i) {
+        receiver.process(elem);
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/5248ce42/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
index b869936..817f2bf 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -17,15 +17,22 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.NoSuchElementException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.values.TupleTag;
 
 /**
  * Translates a {@link Flatten} to a {@link FlattenOperation}.
@@ -34,18 +41,62 @@ public class FlattenTranslator<T> extends 
TransformTranslator.Default<Flatten.PC
   @Override
   public void translateNode(Flatten.PCollections<T> transform, 
TranslationContext context) {
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
-    List<Graphs.Tag> inputTags = userGraphContext.getInputTags();
-    Operation<?> operation;
-    if (inputTags.isEmpty()) {
+
+    Map<Graphs.Tag, Integer> inputTagToCount = Maps.newHashMap();
+    boolean containsDuplicates = false;
+    for (Graphs.Tag inputTag : userGraphContext.getInputTags()) {
+      Integer count = inputTagToCount.get(inputTag);
+      if (count == null) {
+        count = Integer.valueOf(0);
+      }
+      inputTagToCount.put(inputTag, ++count);
+      if (count > 1) {
+        containsDuplicates = true;
+      }
+    }
+
+    if (inputTagToCount.isEmpty()) {
       // Create a empty source
-      operation = new SourceReadOperation(new EmptySource(), 
userGraphContext.getOnlyOutputTag());
+      Operation<?> operation =
+          new SourceReadOperation(new EmptySource(), 
userGraphContext.getOnlyOutputTag());
+      context.addInitStep(
+          Graphs.Step.of(userGraphContext.getStepName(), operation),
+          userGraphContext.getInputTags(),
+          userGraphContext.getOutputTags());
+    } else if (!containsDuplicates) {
+      Operation<?> operation = new FlattenOperation(1);
+      context.addInitStep(
+          Graphs.Step.of(userGraphContext.getStepName(), operation),
+          userGraphContext.getInputTags(),
+          userGraphContext.getOutputTags());
     } else {
-      operation = new FlattenOperation();
+      List<Graphs.Tag> intermediateTags = new ArrayList<>();
+      for (Map.Entry<Graphs.Tag, Integer> entry : inputTagToCount.entrySet()) {
+        Integer dupFactor = entry.getValue();
+        Graphs.Tag inTag = entry.getKey();
+        checkState(
+            dupFactor > 0, "dupFactor should be positive, but was: " + 
dupFactor);
+        if (dupFactor == 1) {
+          intermediateTags.add(inTag);
+        } else {
+          String dupStepName = userGraphContext.getStepName() + "/Dup-" + 
dupFactor;
+          Graphs.Tag outTag = Graphs.Tag.of(
+              dupStepName + ".out",
+              new TupleTag<T>(),
+              inTag.getCoder(),
+              inTag.getWindowingStrategy());
+          context.addInitStep(
+              Graphs.Step.of(dupStepName, new FlattenOperation(dupFactor)),
+              ImmutableList.of(inTag),
+              ImmutableList.of(outTag));
+          intermediateTags.add(outTag);
+        }
+      }
+      context.addInitStep(
+          Graphs.Step.of(userGraphContext.getStepName(), new 
FlattenOperation(1)),
+          intermediateTags,
+          userGraphContext.getOutputTags());
     }
-    context.addInitStep(
-        Graphs.Step.of(userGraphContext.getStepName(), operation),
-        inputTags,
-        userGraphContext.getOutputTags());
   }
 
   private static class EmptySource extends BoundedSource<Void> {

Reply via email to