Repository: incubator-beam Updated Branches: refs/heads/apex-runner 0a1b27895 -> 989e39987
BEAM-783 Add test to cover side inputs and outputs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7105d925 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7105d925 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7105d925 Branch: refs/heads/apex-runner Commit: 7105d925c51a49798849f01f1d7e0b4f3d4f51ad Parents: c9f1406 Author: Thomas Weise <[email protected]> Authored: Wed Oct 19 19:11:54 2016 -0700 Committer: Thomas Weise <[email protected]> Committed: Wed Oct 19 19:11:54 2016 -0700 ---------------------------------------------------------------------- .../translators/ParDoBoundMultiTranslator.java | 14 ++- .../functions/ApexFlattenOperator.java | 3 +- .../translators/ParDoBoundTranslatorTest.java | 96 +++++++++++++++++++- 3 files changed, 107 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java index 13f07c1..9135dd8 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java @@ -68,9 +68,19 @@ public class ParDoBoundMultiTranslator<InputT, OutputT> Map<TupleTag<?>, PCollection<?>> outputs = output.getAll(); Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size()); - int i = 0; for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { - ports.put(outputEntry.getValue(), operator.sideOutputPorts[i++]); + if (outputEntry.getKey() == transform.getMainOutputTag()) { + ports.put(outputEntry.getValue(), operator.output); + } else { + int portIndex = 0; + for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) { + if (tag == outputEntry.getKey()) { + ports.put(outputEntry.getValue(), operator.sideOutputPorts[portIndex]); + break; + } + portIndex++; + } + } } context.addOperator(operator, ports); context.addStream(context.getInput(), operator.input); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java index dd8fcd1..703b1f4 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java @@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory; public class ApexFlattenOperator<InputT> extends BaseOperator { private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class); - private boolean traceTuples = true; + private boolean traceTuples = false; private long inputWM1; private long inputWM2; @@ -121,4 +121,5 @@ public class ApexFlattenOperator<InputT> extends BaseOperator { @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>(); + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7105d925/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java index ad22acd..72b4299 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@ -26,6 +26,8 @@ import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -48,9 +50,11 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; @@ -67,6 +71,8 @@ import org.slf4j.LoggerFactory; @RunWith(JUnit4.class) public class ParDoBoundTranslatorTest { private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorTest.class); + private static final long SLEEP_MILLIS = 500; + private static final long TIMEOUT_MILLIS = 30000; @Test public void test() throws Exception { @@ -94,13 +100,13 @@ public class ParDoBoundTranslatorTest { Assert.assertNotNull(om); Assert.assertEquals(om.getOperator().getClass(), ApexParDoOperator.class); - long timeout = System.currentTimeMillis() + 30000; + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; while (System.currentTimeMillis() < timeout) { if (EmbeddedCollector.RESULTS.containsAll(expected)) { break; } LOG.info("Waiting for expected results."); - Thread.sleep(1000); + Thread.sleep(SLEEP_MILLIS); } Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); } @@ -119,11 +125,12 @@ public class ParDoBoundTranslatorTest { } } - @SuppressWarnings("serial") private static class EmbeddedCollector extends OldDoFn<Object, Void> { + private static final long serialVersionUID = 1L; protected static final HashSet<Object> RESULTS = new HashSet<>(); public EmbeddedCollector() { + RESULTS.clear(); } @Override @@ -175,6 +182,7 @@ public class ParDoBoundTranslatorTest { Pipeline pipeline = Pipeline.create(options); PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3); + // TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown pipeline.run(); } @@ -203,4 +211,86 @@ public class ParDoBoundTranslatorTest { Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator)); } + + @Test + public void testMultiOutputParDoWithSideInputs() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); + options.setRunner(ApexRunner.class); // non-blocking run + Pipeline pipeline = Pipeline.create(options); + + List<Integer> inputs = Arrays.asList(3, -42, 666); + final TupleTag<String> mainOutputTag = new TupleTag<String>("main"); + final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput"); + + PCollectionView<Integer> sideInput1 = pipeline + .apply("CreateSideInput1", Create.of(11)) + .apply("ViewSideInput1", View.<Integer>asSingleton()); + PCollectionView<Integer> sideInputUnread = pipeline + .apply("CreateSideInputUnread", Create.of(-3333)) + .apply("ViewSideInputUnread", View.<Integer>asSingleton()); + PCollectionView<Integer> sideInput2 = pipeline + .apply("CreateSideInput2", Create.of(222)) + .apply("ViewSideInput2", View.<Integer>asSingleton()); + + PCollectionTuple outputs = pipeline + .apply(Create.of(inputs)) + .apply(ParDo.withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .of(new TestMultiOutputWithSideInputsFn( + Arrays.asList(sideInput1, sideInput2), + Arrays.<TupleTag<String>>asList()))); + + outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); + ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); + + HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]", + "processing: -42: [11, 222]", "processing: 666: [11, 222]"); + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(SLEEP_MILLIS); + } + result.cancel(); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + } + + private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> { + private static final long serialVersionUID = 1L; + + final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); + final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>(); + + public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews, + List<TupleTag<String>> sideOutputTupleTags) { + this.sideInputViews.addAll(sideInputViews); + this.sideOutputTupleTags.addAll(sideOutputTupleTags); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + outputToAllWithSideInputs(c, "processing: " + c.element()); + } + + private void outputToAllWithSideInputs(ProcessContext c, String value) { + if (!sideInputViews.isEmpty()) { + List<Integer> sideInputValues = new ArrayList<>(); + for (PCollectionView<Integer> sideInputView : sideInputViews) { + sideInputValues.add(c.sideInput(sideInputView)); + } + value += ": " + sideInputValues; + } + c.output(value); + for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) { + c.sideOutput(sideOutputTupleTag, + sideOutputTupleTag.getId() + ": " + value); + } + } + + } + }
