This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 3cc256e5f81616d8b4126cef6ae8d049fb03460f
Author: Etienne Chauchot <[email protected]>
AuthorDate: Fri Sep 6 17:49:10 2019 +0200

    Apply new Encoders to Read source
---
 .../translation/batch/ReadSourceTranslatorBatch.java              | 8 ++++++--
 .../spark/structuredstreaming/translation/helpers/RowHelpers.java | 4 +---
 .../translation/streaming/ReadSourceTranslatorStreaming.java      | 7 +++++--
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
index 6ae6646..2dcf66f 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ReadSourceTranslatorBatch.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -69,10 +70,13 @@ class ReadSourceTranslatorBatch<T>
             .load();
 
     // extract windowedValue from Row
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
+        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
+
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
-            
RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()),
-            EncoderHelpers.windowedValueEncoder());
+            
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
index 6ee0e07..ac74c29 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/RowHelpers.java
@@ -43,13 +43,11 @@ public final class RowHelpers {
    * @return A {@link MapFunction} that accepts a {@link Row} and returns its 
{@link WindowedValue}.
    */
   public static <T> MapFunction<Row, WindowedValue<T>> 
extractWindowedValueFromRowMapFunction(
-      Coder<T> coder) {
+      WindowedValue.WindowedValueCoder<T> windowedValueCoder) {
     return (MapFunction<Row, WindowedValue<T>>)
         value -> {
           // there is only one value put in each Row by the 
InputPartitionReader
           byte[] bytes = (byte[]) value.get(0);
-          WindowedValue.FullWindowedValueCoder<T> windowedValueCoder =
-              WindowedValue.FullWindowedValueCoder.of(coder, 
GlobalWindow.Coder.INSTANCE);
           return windowedValueCoder.decode(new ByteArrayInputStream(bytes));
         };
   }
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
index c3d07ff..9e03d96 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/streaming/ReadSourceTranslatorStreaming.java
@@ -27,6 +27,7 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Row
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -70,10 +71,12 @@ class ReadSourceTranslatorStreaming<T>
             .load();
 
     // extract windowedValue from Row
+    WindowedValue.FullWindowedValueCoder<T> windowedValueCoder = 
WindowedValue.FullWindowedValueCoder
+        .of(source.getOutputCoder(), GlobalWindow.Coder.INSTANCE);
     Dataset<WindowedValue<T>> dataset =
         rowDataset.map(
-            
RowHelpers.extractWindowedValueFromRowMapFunction(source.getOutputCoder()),
-            EncoderHelpers.windowedValueEncoder());
+            
RowHelpers.extractWindowedValueFromRowMapFunction(windowedValueCoder),
+            EncoderHelpers.fromBeamCoder(windowedValueCoder));
 
     PCollection<T> output = (PCollection<T>) context.getOutput();
     context.putDataset(output, dataset);

Reply via email to