mr-runner: hack to get around that ViewAsXXX.expand() return wrong output PValue.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5905efd3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5905efd3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5905efd3 Branch: refs/heads/mr-runner Commit: 5905efd3364f2cd27567126508576aac887a1f63 Parents: 98da2a2 Author: Pei He <[email protected]> Authored: Wed Aug 2 21:59:21 2017 +0800 Committer: Pei He <[email protected]> Committed: Thu Aug 31 14:13:48 2017 +0800 ---------------------------------------------------------------------- .../translation/TranslationContext.java | 54 +++++++++++++++----- 1 file changed, 42 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5905efd3/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java index 2b51df5..365bdc0 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/TranslationContext.java @@ -22,13 +22,17 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Function; import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import java.util.List; import java.util.Map; import org.apache.beam.runners.mapreduce.MapReducePipelineOptions; import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; @@ -82,6 +86,11 @@ public class TranslationContext { this.currentNode = node; for (Map.Entry<TupleTag<?>, PValue> entry : currentNode.getOutputs().entrySet()) { pValueToTupleTag.put(entry.getValue(), entry.getKey()); + // TODO: this is a hack to get around that ViewAsXXX.expand() return wrong output PValue. + if (node.getTransform() instanceof View.CreatePCollectionView) { + View.CreatePCollectionView view = (View.CreatePCollectionView) node.getTransform(); + pValueToTupleTag.put(view.getView(), view.getView().getTagInternal()); + } } } @@ -98,29 +107,50 @@ public class TranslationContext { } public List<Graphs.Tag> getInputTags() { - return FluentIterable.from(currentNode.getInputs().values()) + Iterable<PValue> inputs; + if (currentNode.getTransform() instanceof ParDo.MultiOutput) { + ParDo.MultiOutput parDo = (ParDo.MultiOutput) currentNode.getTransform(); + inputs = ImmutableList.<PValue>builder() + .add(getInput()).addAll(parDo.getSideInputs()).build(); + } else { + inputs = currentNode.getInputs().values(); + } + return FluentIterable.from(inputs) .transform(new Function<PValue, Graphs.Tag>() { @Override public Graphs.Tag apply(PValue pValue) { checkState( pValueToTupleTag.containsKey(pValue), String.format("Failed to find TupleTag for pValue: %s.", pValue)); - PCollection<?> pc = (PCollection<?>) pValue; - return Graphs.Tag.of( - pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder()); + if (pValue instanceof PCollection) { + PCollection<?> pc = (PCollection<?>) pValue; + return Graphs.Tag.of( + pc.getName(), pValueToTupleTag.get(pValue), pc.getCoder()); + } else { + return Graphs.Tag.of( + pValue.getName(), + pValueToTupleTag.get(pValue), + ((PCollectionView) pValue).getCoderInternal()); + } }}) .toList(); } public List<Graphs.Tag> getOutputTags() { - return FluentIterable.from(currentNode.getOutputs().entrySet()) - .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() { - @Override - public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) { - PCollection<?> pc = (PCollection<?>) entry.getValue(); - return Graphs.Tag.of(pc.getName(), entry.getKey(), pc.getCoder()); - }}) - .toList(); + if (currentNode.getTransform() instanceof View.CreatePCollectionView) { + PCollectionView view = ((View.CreatePCollectionView) currentNode.getTransform()).getView(); + return ImmutableList.of( + Graphs.Tag.of(view.getName(), view.getTagInternal(), view.getCoderInternal())); + } else { + return FluentIterable.from(currentNode.getOutputs().entrySet()) + .transform(new Function<Map.Entry<TupleTag<?>, PValue>, Graphs.Tag>() { + @Override + public Graphs.Tag apply(Map.Entry<TupleTag<?>, PValue> entry) { + PCollection<?> pc = (PCollection<?>) entry.getValue(); + return Graphs.Tag.of(pc.getName(), entry.getKey(), pc.getCoder()); + }}) + .toList(); + } } public TupleTag<?> getOnlyOutputTag() {
