This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 3d4d945  Add ParDoTest
3d4d945 is described below

commit 3d4d945390e3f89f1baeab6f678d7004ec79900d
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Thu Jan 17 15:18:36 2019 +0100

    Add ParDoTest
---
 .../translation/batch/ParDoTest.java               | 43 ++++++++++++++++++++++
 1 file changed, 43 insertions(+)

diff --git 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
new file mode 100644
index 0000000..06dc191
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java
@@ -0,0 +1,43 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.io.Serializable;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+/**
+ * Test class for beam to spark {@link ParDo} translation.
+ */
+@RunWith(JUnit4.class)
+public class ParDoTest implements Serializable {
+  private static Pipeline pipeline;
+
+  @BeforeClass
+  public static void beforeClass(){
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    pipeline = Pipeline.create(options);
+  }
+
+  @Test
+  public void testPardo(){
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 
8, 9, 10));
+    input.apply(ParDo.of(new DoFn<Integer, Integer>() {
+      @ProcessElement
+      public void processElement (ProcessContext context){
+        context.output(context.element() + 1);
+      }
+    }));
+    pipeline.run();
+  }
+
+}

Reply via email to