This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 3cc256e5f81616d8b4126cef6ae8d049fb03460f Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Sep 6 17:49:10 2019 +0200 Apply new Encoders to Read source --- .../translation/batch/ReadSourceTranslatorBatch.java | 8 ++++++-- .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 7 +++++-- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 6ae6646..2dcf66f 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch<T> .load(); // extract windowedValue from Row + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder + .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + Dataset<WindowedValue<T>> dataset = rowDataset.map( - RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()), - EncoderHelpers.windowedValueEncoder()); + RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), + EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection<T> output = (PCollection<T>) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java index 6ee0e07..ac74c29 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java @@ -43,13 +43,11 @@ public final class RowHelpers { * @return A {@link MapFunction} that accepts a {@link Row} and returns its {@link WindowedValue}. */ public static <T> MapFunction<Row, WindowedValue<T>> extractWindowedValueFromRowMapFunction( - Coder<T> coder) { + WindowedValue.WindowedValueCoder<T> windowedValueCoder) { return (MapFunction<Row, WindowedValue<T>>) value -> { // there is only one value put in each Row by the InputPartitionReader byte[] bytes = (byte[]) value.get(0); - WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = - WindowedValue.FullWindowedValueCoder.of(coder, GlobalWindow.Coder.INSTANCE); return windowedValueCoder.decode(new ByteArrayInputStream(bytes)); }; } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index c3d07ff..9e03d96 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -27,6 +27,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming<T> .load(); // extract windowedValue from Row + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder + .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset<WindowedValue<T>> dataset = rowDataset.map( - RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()), - EncoderHelpers.windowedValueEncoder()); + RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), + EncoderHelpers.fromBeamCoder(windowedValueCoder)); PCollection<T> output = (PCollection<T>) context.getOutput(); context.putDataset(output, dataset);
