This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 68d3d6798950888590fca915782d5288fe2d1e5a
Author: Etienne Chauchot <[email protected]>
AuthorDate: Thu Sep 19 17:20:31 2019 +0200

    Improve performance of source: the mapper already calls 
windowedValueCoder.decode, no need to call it also in the Spark encoder
---
 .../translation/batch/ReadSourceTranslatorBatch.java             | 9 ++++++---
 .../translation/streaming/ReadSourceTranslatorStreaming.java     | 9 ++++++---
 2 files changed, 12 insertions(+), 6 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 2dcf66f..ceb87cf 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -70,13 +70,16 @@ class ReadSourceTranslatorBatch<T>
             .load();
 
     // extract windowedValue from Row
-    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
-        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
 
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
             
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-            EncoderHelpers.fromBeamCoder(windowedValueCoder));
+            // using kryo bytes serialization because the mapper already calls
+            // windowedValueCoder.decode, no need to call it also in the Spark 
encoder
+            EncoderHelpers.windowedValueEncoder());
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index 9e03d96..9f1e34d 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -71,12 +71,15 @@ class ReadSourceTranslatorStreaming<T>
             .load();
 
     // extract windowedValue from Row
-    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
-        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
             
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
-            EncoderHelpers.fromBeamCoder(windowedValueCoder));
+            // using kryo bytes serialization because the mapper already calls
+            // windowedValueCoder.decode, no need to call it also in the Spark 
encoder
+            EncoderHelpers.windowedValueEncoder());
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);

Reply via email to