This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 4e0f7a0eeb56791f9d3f66873573eab946f5cbf5 Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Dec 28 10:16:01 2018 +0100 Use raw Encoder<WindowedValue> also in regular ReadSourceTranslatorBatch --- .../translation/TranslationContext.java | 1 - .../batch/ReadSourceTranslatorBatch.java | 22 ++++++++++------------ .../batch/ReadSourceTranslatorMockBatch.java | 2 ++ 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java index 82aa80b..acc49f4 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/TranslationContext.java @@ -115,7 +115,6 @@ public class TranslationContext { } } - //TODO: remove. It is just for testing public void putDatasetRaw(PValue value, Dataset<WindowedValue> dataset) { if (!datasets.containsKey(value)) { datasets.put(value, dataset); diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 370e3f4..d980a52 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.beam.runners.core.construction.ReadTranslation; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; -import org.apache.beam.runners.spark.structuredstreaming.translation.streaming.DatasetStreamingSource; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; @@ -30,9 +29,9 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; -import org.apache.spark.sql.streaming.DataStreamReader; class ReadSourceTranslatorBatch<T> implements TransformTranslator<PTransform<PBegin, PCollection<T>>> { @@ -47,7 +46,6 @@ class ReadSourceTranslatorBatch<T> (AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>>) context.getCurrentTransform(); - String providerClassName = SOURCE_PROVIDER_CLASS.substring(0, SOURCE_PROVIDER_CLASS.indexOf("$")); BoundedSource<T> source; try { source = ReadTranslation.boundedSourceFromTransform(rootTransform); @@ -56,20 +54,20 @@ class ReadSourceTranslatorBatch<T> } SparkSession sparkSession = context.getSparkSession(); - Dataset<Row> rowDataset = sparkSession.read().format(providerClassName).load(); + Dataset<Row> rowDataset = sparkSession.read().format(SOURCE_PROVIDER_CLASS).load(); - //TODO initialize source : how, to get a reference to the DatasetStreamingSource instance that spark - // instantiates to be able to call DatasetStreamingSource.initialize(). How to pass in a DatasetCatalog? - MapFunction<Row, WindowedValue<T>> func = new MapFunction<Row, WindowedValue<T>>() { - @Override public WindowedValue<T> call(Row value) throws Exception { + //TODO pass the source and the translation context serialized as string to the DatasetSource + MapFunction<Row, WindowedValue> func = new MapFunction<Row, WindowedValue>() { + @Override public WindowedValue call(Row value) throws Exception { //there is only one value put in each Row by the InputPartitionReader - return value.<WindowedValue<T>>getAs(0); + return value.<WindowedValue>getAs(0); } }; - //TODO fix encoder: how to get an Encoder<WindowedValue<T>> - Dataset<WindowedValue<T>> dataset = rowDataset.map(func, null); + //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> + // be created ? + Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class)); PCollection<T> output = (PCollection<T>) context.getOutput(); - context.putDataset(output, dataset); + context.putDatasetRaw(output, dataset); } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java index 758ff1d..d7b9175 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorMockBatch.java @@ -52,6 +52,8 @@ class ReadSourceTranslatorMockBatch<T> return value.<WindowedValue>getAs(0); } }; + //TODO: is there a better way than using the raw WindowedValue? Can an Encoder<WindowedVAlue<T>> + // be created ? Dataset<WindowedValue> dataset = rowDataset.map(func, Encoders.kryo(WindowedValue.class)); PCollection<T> output = (PCollection<T>) context.getOutput();
