This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 47c20c27b261afcd7da9b807b040c78bd7db2495 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Fri Jan 11 10:39:10 2019 +0100 Add serialization test --- .../spark/structuredstreaming/SourceTest.java | 51 +++++++++++++++++++++- 1 file changed, 50 insertions(+), 1 deletion(-) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java index 8263718..c348ed5 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java @@ -1,17 +1,37 @@ package org.apache.beam.runners.spark.structuredstreaming; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.construction.PipelineOptionsSerializationUtils; +import org.apache.beam.runners.core.serialization.Base64Serializer; +import org.apache.beam.runners.spark.structuredstreaming.translation.batch.DatasetSourceBatch; +import org.apache.beam.runners.spark.structuredstreaming.utils.SerializationDebugger; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.Create; +import org.apache.spark.sql.sources.v2.DataSourceOptions; +import org.apache.spark.sql.sources.v2.reader.DataSourceReader; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; /** * Test class for beam to spark source translation. */ -public class SourceTest { +@RunWith(JUnit4.class) +public class SourceTest implements Serializable { private static Pipeline pipeline; + @Rule + public TemporaryFolder temporaryFolder; @BeforeClass public static void beforeClass(){ @@ -21,6 +41,35 @@ public class SourceTest { } @Test + public void testSerialization() throws IOException{ + Map<String, String> datasetSourceOptions = new HashMap<>(); + BoundedSource<Integer> source = new BoundedSource<Integer>() { + + @Override public List<? extends BoundedSource<Integer>> split(long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + return new ArrayList<>(); + } + + @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 0; + } + + @Override public BoundedReader<Integer> createReader(PipelineOptions options) + throws IOException { + return null; + } + }; + String serializedSource = Base64Serializer.serializeUnchecked(source); + datasetSourceOptions.put("source", serializedSource); + datasetSourceOptions.put("defaultParallelism", "4"); + datasetSourceOptions.put("pipelineOptions", + PipelineOptionsSerializationUtils.serializeToJson(pipeline.getOptions())); + DataSourceReader objectToTest = new DatasetSourceBatch() + .createReader(new DataSourceOptions(datasetSourceOptions)); + SerializationDebugger.testSerialization(objectToTest, temporaryFolder.getRoot()); + } + + @Test public void testBoundedSource(){ pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); pipeline.run();