This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 7f1060aa189a625400a1fbcfc2503d3e721ade8f Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Sep 27 11:22:15 2019 +0200 Apply new Encoders to Window assign translation --- .../translation/batch/WindowAssignTranslatorBatch.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index fb37f97..576b914 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -23,6 +23,7 @@ import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.Enc import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.spark.sql.Dataset; @@ -44,10 +45,13 @@ class WindowAssignTranslatorBatch<T> if (WindowingHelpers.skipAssignWindows(assignTransform, context)) { context.putDataset(output, inputDataset); } else { + WindowFn<T, ?> windowFn = assignTransform.getWindowFn(); + WindowedValue.FullWindowedValueCoder<T> windoweVdalueCoder = WindowedValue.FullWindowedValueCoder + .of(input.getCoder(), windowFn.windowCoder()); Dataset<WindowedValue<T>> outputDataset = inputDataset.map( - WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()), - EncoderHelpers.windowedValueEncoder()); + WindowingHelpers.assignWindowsMapFunction(windowFn), + EncoderHelpers.fromBeamCoder(windoweVdalueCoder)); context.putDataset(output, outputDataset); } }
