mr-runner: hack to get around that ViewAsXXX.expand() return wrong output 
PValue.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5905efd3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5905efd3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5905efd3

Branch: refs/heads/mr-runner
Commit: 5905efd3364f2cd27567126508576aac887a1f63
Parents: 98da2a2
Author: Pei He <[email protected]>
Authored: Wed Aug 2 21:59:21 2017 +0800
Committer: Pei He <[email protected]>
Committed: Thu Aug 31 14:13:48 2017 +0800

----------------------------------------------------------------------
 .../translation/TranslationContext.java         | 54 +++++++++++++++-----
 1 file changed, 42 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5905efd3/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
index 2b51df5..365bdc0 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java
@@ -22,13 +22,17 @@ import static 
com.google.common.base.Preconditions.checkState;
 
 import com.google.common.base.Function;
 import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.mapreduce.MapReducePipelineOptions;
 import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -82,6 +86,11 @@ public class TranslationContext {
       this.currentNode = node;
       for (Map.Entry<TupleTag<?>, PValue> entry : 
currentNode.getOutputs().entrySet()) {
         pValueToTupleTag.put(entry.getValue(), entry.getKey());
+        // TODO: this is a hack to get around that ViewAsXXX.expand() return 
wrong output PValue.
+        if (node.getTransform() instanceof View.CreatePCollectionView) {
+          View.CreatePCollectionView view = (View.CreatePCollectionView) 
node.getTransform();
+          pValueToTupleTag.put(view.getView(), 
view.getView().getTagInternal());
+        }
       }
     }
 
@@ -98,29 +107,50 @@ public class TranslationContext {
     }
 
     public List<Graphs.Tag> getInputTags() {
-      return FluentIterable.from(currentNode.getInputs().values())
+      Iterable<PValue> inputs;
+      if (currentNode.getTransform() instanceof ParDo.MultiOutput) {
+        ParDo.MultiOutput parDo = (ParDo.MultiOutput) 
currentNode.getTransform();
+        inputs = ImmutableList.<PValue>builder()
+            .add(getInput()).addAll(parDo.getSideInputs()).build();
+      } else {
+        inputs = currentNode.getInputs().values();
+      }
+      return FluentIterable.from(inputs)
           .transform(new Function<PValue, Graphs.Tag>() {
             @Override
             public Graphs.Tag apply(PValue pValue) {
               checkState(
                   pValueToTupleTag.containsKey(pValue),
                   String.format("Failed to find TupleTag for pValue: %s.", 
pValue));
-              PCollection<?> pc = (PCollection<?>) pValue;
-              return Graphs.Tag.of(
-                  pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
+              if (pValue instanceof PCollection) {
+                PCollection<?> pc = (PCollection<?>) pValue;
+                return Graphs.Tag.of(
+                    pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder());
+              } else {
+                return Graphs.Tag.of(
+                    pValue.getName(),
+                    pValueToTupleTag.get(pValue),
+                    ((PCollectionView) pValue).getCoderInternal());
+              }
             }})
           .toList();
     }
 
     public List<Graphs.Tag> getOutputTags() {
-      return FluentIterable.from(currentNode.getOutputs().entrySet())
-          .transform(new Function<Map.Entry<TupleTag<?>, PValue>, 
Graphs.Tag>() {
-            @Override
-            public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
-              PCollection<?> pc = (PCollection<?>) entry.getValue();
-              return Graphs.Tag.of(pc.getName(), entry.getKey(), 
pc.getCoder());
-            }})
-          .toList();
+      if (currentNode.getTransform() instanceof View.CreatePCollectionView) {
+        PCollectionView view = ((View.CreatePCollectionView) 
currentNode.getTransform()).getView();
+        return ImmutableList.of(
+            Graphs.Tag.of(view.getName(), view.getTagInternal(), 
view.getCoderInternal()));
+      } else {
+        return FluentIterable.from(currentNode.getOutputs().entrySet())
+            .transform(new Function<Map.Entry<TupleTag<?>, PValue>, 
Graphs.Tag>() {
+              @Override
+              public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) {
+                PCollection<?> pc = (PCollection<?>) entry.getValue();
+                return Graphs.Tag.of(pc.getName(), entry.getKey(), 
pc.getCoder());
+              }})
+            .toList();
+      }
     }
 
     public TupleTag<?> getOnlyOutputTag() {

Reply via email to