This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 68d3d6798950888590fca915782d5288fe2d1e5a Author: Etienne Chauchot <[email protected]> AuthorDate: Thu Sep 19 17:20:31 2019 +0200 Improve performance of source: the mapper already calls windowedValueCoder.decode, no need to call it also in the Spark encoder --- .../translation/batch/ReadSourceTranslatorBatch.java | 9 ++++++--- .../translation/streaming/ReadSourceTranslatorStreaming.java | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java index 2dcf66f..ceb87cf 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java @@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch<T> .load(); // extract windowedValue from Row - WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder - .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of( + source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset<WindowedValue<T>> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), - EncoderHelpers.fromBeamCoder(windowedValueCoder)); + // using kryo bytes serialization because the mapper already calls + // windowedValueCoder.decode, no need to call it also in the Spark encoder + EncoderHelpers.windowedValueEncoder()); PCollection<T> output = (PCollection<T>) context.getOutput(); context.putDataset(output, dataset); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java index 9e03d96..9f1e34d 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java @@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming<T> .load(); // extract windowedValue from Row - WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = WindowedValue.FullWindowedValueCoder - .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); + WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = + WindowedValue.FullWindowedValueCoder.of( + source.getOutputCoder(), GlobalWindow.Coder.INSTANCE); Dataset<WindowedValue<T>> dataset = rowDataset.map( RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder), - EncoderHelpers.fromBeamCoder(windowedValueCoder)); + // using kryo bytes serialization because the mapper already calls + // windowedValueCoder.decode, no need to call it also in the Spark encoder + EncoderHelpers.windowedValueEncoder()); PCollection<T> output = (PCollection<T>) context.getOutput(); context.putDataset(output, dataset);
