This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 3d4d945 Add ParDoTest 3d4d945 is described below commit 3d4d945390e3f89f1baeab6f678d7004ec79900d Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Thu Jan 17 15:18:36 2019 +0100 Add ParDoTest --- .../translation/batch/ParDoTest.java | 43 ++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java new file mode 100644 index 0000000..06dc191 --- /dev/null +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java @@ -0,0 +1,43 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import java.io.Serializable; +import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.SparkRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +/** + * Test class for beam to spark {@link ParDo} translation. + */ +@RunWith(JUnit4.class) +public class ParDoTest implements Serializable { + private static Pipeline pipeline; + + @BeforeClass + public static void beforeClass(){ + PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + pipeline = Pipeline.create(options); + } + + @Test + public void testPardo(){ + PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + input.apply(ParDo.of(new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement (ProcessContext context){ + context.output(context.element() + 1); + } + })); + pipeline.run(); + } + +}