This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 7d456b42c1bafef6eab281dc2ed2dd098f8bda6a
Author: Etienne Chauchot <[email protected]>
AuthorDate: Fri Sep 6 13:24:18 2019 +0200

    Apply new Encoders to CombinePerKey
---
 .../translation/batch/CombinePerKeyTranslatorBatch.java     | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
index e0e80dd..33b037a 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java
@@ -23,6 +23,8 @@ import 
org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers;
 import 
org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -56,8 +58,11 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, 
OutputT>
 
     Dataset<WindowedValue<KV<K, InputT>>> inputDataset = 
context.getDataset(input);
 
+    Coder<K> keyCoder = (Coder<K>) input.getCoder().getCoderArguments().get(0);
+    Coder<OutputT> outputTCoder = (Coder<OutputT>) 
output.getCoder().getCoderArguments().get(1);
+
     KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset =
-        inputDataset.groupByKey(KVHelpers.extractKey(), 
EncoderHelpers.genericEncoder());
+        inputDataset.groupByKey(KVHelpers.extractKey(), 
EncoderHelpers.fromBeamCoder(keyCoder));
 
     Dataset<Tuple2<K, Iterable<WindowedValue<OutputT>>>> combinedDataset =
         groupedDataset.agg(
@@ -66,6 +71,10 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, 
OutputT>
                 .toColumn());
 
     // expand the list into separate elements and put the key back into the 
elements
+    Coder<KV<K, OutputT>> kvCoder = KvCoder.of(keyCoder, outputTCoder);
+    WindowedValue.WindowedValueCoder<KV<K, OutputT>> wvCoder =
+        WindowedValue.FullWindowedValueCoder.of(
+            kvCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
     Dataset<WindowedValue<KV<K, OutputT>>> outputDataset =
         combinedDataset.flatMap(
             (FlatMapFunction<
@@ -85,7 +94,7 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
                   }
                   return result.iterator();
                 },
-            EncoderHelpers.windowedValueEncoder());
+            EncoderHelpers.fromBeamCoder(wvCoder));
     context.putDataset(output, outputDataset);
   }
 }

Reply via email to