This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7d456b42c1bafef6eab281dc2ed2dd098f8bda6a Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Sep 6 13:24:18 2019 +0200 Apply new Encoders to CombinePerKey --- .../translation/batch/CombinePerKeyTranslatorBatch.java | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java index e0e80dd..33b037a 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombinePerKeyTranslatorBatch.java @@ -23,6 +23,8 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTr import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.KVHelpers; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -56,8 +58,11 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> Dataset<WindowedValue<KV<K, InputT>>> inputDataset = context.getDataset(input); + Coder<K> keyCoder = (Coder<K>) input.getCoder().getCoderArguments().get(0); + Coder<OutputT> outputTCoder = (Coder<OutputT>) output.getCoder().getCoderArguments().get(1); + KeyValueGroupedDataset<K, WindowedValue<KV<K, InputT>>> groupedDataset = - inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.genericEncoder()); + inputDataset.groupByKey(KVHelpers.extractKey(), EncoderHelpers.fromBeamCoder(keyCoder)); Dataset<Tuple2<K, Iterable<WindowedValue<OutputT>>>> combinedDataset = groupedDataset.agg( @@ -66,6 +71,10 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> .toColumn()); // expand the list into separate elements and put the key back into the elements + Coder<KV<K, OutputT>> kvCoder = KvCoder.of(keyCoder, outputTCoder); + WindowedValue.WindowedValueCoder<KV<K, OutputT>> wvCoder = + WindowedValue.FullWindowedValueCoder.of( + kvCoder, input.getWindowingStrategy().getWindowFn().windowCoder()); Dataset<WindowedValue<KV<K, OutputT>>> outputDataset = combinedDataset.flatMap( (FlatMapFunction< @@ -85,7 +94,7 @@ class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT> } return result.iterator(); }, - EncoderHelpers.windowedValueEncoder()); + EncoderHelpers.fromBeamCoder(wvCoder)); context.putDataset(output, outputDataset); } }
