Repository: beam Updated Branches: refs/heads/master 2e072a032 -> 8998cb90d
Correct Apex translation of Single-input Flatten Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed602aae Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed602aae Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed602aae Branch: refs/heads/master Commit: ed602aae0010bbe1ee09c15e11ace3d11139d0dc Parents: 2e072a0 Author: Thomas Weise <[email protected]> Authored: Mon Feb 27 22:32:28 2017 -0800 Committer: Thomas Groh <[email protected]> Committed: Tue Feb 28 09:19:54 2017 -0800 ---------------------------------------------------------------------- .../FlattenPCollectionTranslator.java | 9 ++++++--- .../apex/translation/TranslationContext.java | 14 ++++++++++++++ .../FlattenPCollectionTranslatorTest.java | 20 ++++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ed602aae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java index 928f135..2e31dfc 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java @@ -43,9 +43,9 @@ class FlattenPCollectionTranslator<T> implements @Override public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { - List<TaggedPValue> inputs = context.getInputs(); + List<PCollection<T>> inputCollections = extractPCollections(context.getInputs()); - if (inputs.isEmpty()) { + if (inputCollections.isEmpty()) { // create a dummy source that never emits anything @SuppressWarnings("unchecked") UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST, @@ -53,10 +53,13 @@ class FlattenPCollectionTranslator<T> implements ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( unboundedSource, context.getPipelineOptions()); context.addOperator(operator, operator.output); + } else if (inputCollections.size() == 1) { + context.addAlias(context.getOutput(), inputCollections.get(0)); } else { + @SuppressWarnings("unchecked") PCollection<T> output = (PCollection<T>) context.getOutput(); Map<PCollection<?>, Integer> unionTags = Collections.emptyMap(); - flattenCollections(extractPCollections(inputs), unionTags, output, context); + flattenCollections(inputCollections, unionTags, output, context); } } http://git-wip-us.apache.org/repos/asf/beam/blob/ed602aae/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index acd8ab1..fc49fc7 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -57,6 +57,7 @@ class TranslationContext { private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>(); private final Map<String, Operator> operators = new HashMap<>(); private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>(); + private Map<PInput, PInput> aliasCollections = new HashMap<>(); public void addView(PCollectionView<?> view) { this.viewInputs.put(view, this.getInput()); @@ -145,11 +146,24 @@ class TranslationContext { } public void addStream(PInput input, InputPort inputPort) { + while (aliasCollections.containsKey(input)) { + input = aliasCollections.get(input); + } Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input); checkArgument(stream != null, "no upstream operator defined for %s", input); stream.getRight().add(inputPort); } + /** + * Set the given output as alias for another input, + * i.e. there won't be a stream representation in the target DAG. + * @param alias + * @param source + */ + public void addAlias(PValue alias, PInput source) { + aliasCollections.put(alias, source); + } + public void populateDAG(DAG dag) { for (Map.Entry<String, Operator> nameAndOperator : this.operators.entrySet()) { dag.addOperator(nameAndOperator.getKey(), nameAndOperator.getValue()); http://git-wip-us.apache.org/repos/asf/beam/blob/ed602aae/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java index 7e678e8..b2e29b6 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.apex.translation; +import com.datatorrent.api.DAG; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import java.util.ArrayList; @@ -25,6 +26,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; +import org.apache.apex.api.EmbeddedAppLauncher; +import org.apache.apex.api.Launcher; +import org.apache.apex.api.Launcher.LaunchMode; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.ApexRunnerResult; @@ -93,4 +97,20 @@ public class FlattenPCollectionTranslatorTest { } } + @Test + public void testFlattenSingleCollection() { + ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setRunner(ApexRunner.class); + ApexPipelineTranslator translator = new ApexPipelineTranslator(options); + EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); + DAG dag = launcher.getDAG(); + + Pipeline p = Pipeline.create(options); + PCollection<String> single = p.apply(Create.of(Collections.singletonList("1"))); + PCollectionList.of(single).apply(Flatten.<String>pCollections()) + .apply(ParDo.of(new EmbeddedCollector())); + translator.translate(p, dag); + Assert.assertNotNull(dag.getOperatorMeta("ParDo(EmbeddedCollector)")); + } + }
