This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8cdc20f7e0a53de18afb70afd31da374dcf6d93e Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Dec 20 17:18:54 2018 +0100 fix mock, wire mock in translators and create a main test. --- .../translation/batch/PipelineTranslatorBatch.java | 2 +- .../batch/ReadSourceTranslatorBatch.java | 3 ++- .../batch/ReadSourceTranslatorMockBatch.java | 21 +++++++-------------- .../translation/io/DatasetSourceMock.java | 6 +++--- .../spark/structuredstreaming/SourceTest.java | 16 ++++++++++++++++ 5 files changed, 29 insertions(+), 19 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java index 26f1b9c..9ccc712 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/PipelineTranslatorBatch.java @@ -65,7 +65,7 @@ public class PipelineTranslatorBatch extends PipelineTranslator { PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch()); TRANSFORM_TRANSLATORS.put( - PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch()); + PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorMockBatch()); } public PipelineTranslatorBatch(SparkPipelineOptions options) { diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 0b828fb..aed016a 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -40,6 +40,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalog.Catalog; import org.apache.spark.sql.catalyst.catalog.CatalogTable; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; import org.apache.spark.sql.streaming.DataStreamReader; class ReadSourceTranslatorBatch<T> @@ -63,8 +64,8 @@ class ReadSourceTranslatorBatch<T> throw new RuntimeException(e); } SparkSession sparkSession = context.getSparkSession(); - DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName); + Dataset<Row> rowDataset = dataStreamReader.load(); //TODO initialize source : how, to get a reference to the DatasetSource instance that spark diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java index 5b1bada..504a64d 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java @@ -17,28 +17,25 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; -import java.io.IOException; -import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSource; import org.apache.beam.runners.spark.structuredstreaming.translation.io.DatasetSourceMock; -import org.apache.beam.sdk.io.BoundedSource; -import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Encoder; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.DataStreamReader; -import org.apache.spark.sql.types.StructType; -import scala.reflect.ClassTag; + +/** + * Mock translator that generates a source of 0 to 999 and prints it. + * @param <T> + */ class ReadSourceTranslatorMockBatch<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> { @@ -48,13 +45,8 @@ class ReadSourceTranslatorMockBatch<T> @Override public void translateTransform( PTransform<PBegin, PCollection<T>> transform, TranslationContext context) { - AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> rootTransform = - (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) - context.getCurrentTransform(); - - String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$")); SparkSession sparkSession = context.getSparkSession(); - DataStreamReader dataStreamReader = sparkSession.readStream().format(providerClassName); + DataStreamReader dataStreamReader = sparkSession.readStream().format(SOURCE_PROVIDER_CLASS); Dataset<Row> rowDataset = dataStreamReader.load(); @@ -77,5 +69,6 @@ class ReadSourceTranslatorMockBatch<T> PCollection<T> output = (PCollection<T>) context.getOutput(); context.putDataset(output, dataset); + dataset.show(); } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java index fa42fdf..ec88364 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/io/DatasetSourceMock.java @@ -46,7 +46,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport { } /** This class can be mapped to Beam {@link BoundedSource}. */ - private class DatasetMicroBatchReader implements MicroBatchReader { + private static class DatasetMicroBatchReader implements MicroBatchReader { @Override public void setOffsetRange(Optional<Offset> start, Optional<Offset> end) { } @@ -70,7 +70,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport { } @Override public StructType readSchema() { - return null; + return new StructType(); } @Override public List<InputPartition<InternalRow>> planInputPartitions() { @@ -86,7 +86,7 @@ public class DatasetSourceMock implements DataSourceV2, MicroBatchReadSupport { } /** This class is a mocked reader*/ - private class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> { + private static class DatasetMicroBatchPartitionReaderMock implements InputPartitionReader<InternalRow> { private ArrayList<Integer> values; private int currentIndex = 0; diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java new file mode 100644 index 0000000..eea9769 --- /dev/null +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/SourceTest.java @@ -0,0 +1,16 @@ +package org.apache.beam.runners.spark.structuredstreaming; + +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; + +public class SourceTest { + public static void main(String[] args) { + PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create(); + Pipeline pipeline = Pipeline.create(options); + pipeline.apply(Create.of(1)); + pipeline.run(); + } + +}
