This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
commit 010fcb664035eafb7f1db2e8bdc81c7f1260a003 Author: Etienne Chauchot <[email protected]> AuthorDate: Fri Mar 1 16:51:36 2019 +0100 Implement WindowAssignTranslatorBatch --- .../batch/WindowAssignTranslatorBatch.java | 24 ++++++++- .../translation/helpers/WindowingHelpers.java | 62 ++++++++++++++++++++-- 2 files changed, 81 insertions(+), 5 deletions(-) diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java index 51e21c2..b27181a 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/WindowAssignTranslatorBatch.java @@ -19,13 +19,35 @@ package org.apache.beam.runners.spark.structuredstreaming.translation.batch; import org.apache.beam.runners.spark.structuredstreaming.translation.TransformTranslator; import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.EncoderHelpers; +import org.apache.beam.runners.spark.structuredstreaming.translation.helpers.WindowingHelpers; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; +import org.apache.spark.sql.Dataset; class WindowAssignTranslatorBatch<T> implements TransformTranslator<PTransform<PCollection<T>, PCollection<T>>> { @Override public void translateTransform( - PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) {} + PTransform<PCollection<T>, PCollection<T>> transform, TranslationContext context) { + + Window.Assign<T> assignTransform = (Window.Assign<T>) transform; + @SuppressWarnings("unchecked") + final PCollection<T> input = (PCollection<T>) context.getInput(); + @SuppressWarnings("unchecked") + final PCollection<T> output = (PCollection<T>) context.getOutput(); + + Dataset<WindowedValue<T>> inputDataset = context.getDataset(input); + if (WindowingHelpers.skipAssignWindows(assignTransform, context)) { + context.putDataset(output, inputDataset); + } else { + Dataset<WindowedValue<T>> outputDataset = inputDataset + .map(WindowingHelpers.assignWindowsMapFunction(assignTransform.getWindowFn()), + EncoderHelpers.windowedValueEncoder()); + context.putDataset(output, outputDataset); + } + } } diff --git a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java index 45b5153..8188782 100644 --- a/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java +++ b/runners/spark-structured-streaming/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/helpers/WindowingHelpers.java @@ -17,8 +17,17 @@ */ package org.apache.beam.runners.spark.structuredstreaming.translation.helpers; +import com.google.common.collect.Iterables; +import java.util.Collection; +import org.apache.beam.runners.spark.structuredstreaming.translation.TranslationContext; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; import org.apache.spark.api.java.function.MapFunction; +import org.joda.time.Instant; /** Helper functions for working with windows. */ public final class WindowingHelpers { @@ -33,8 +42,8 @@ public final class WindowingHelpers { */ public static <T> MapFunction<T, WindowedValue<T>> windowMapFunction() { return new MapFunction<T, WindowedValue<T>>() { - @Override - public WindowedValue<T> call(T t) { + + @Override public WindowedValue<T> call(T t) { return WindowedValue.valueInGlobalWindow(t); } }; @@ -48,10 +57,55 @@ public final class WindowingHelpers { */ public static <T> MapFunction<WindowedValue<T>, T> unwindowMapFunction() { return new MapFunction<WindowedValue<T>, T>() { - @Override - public T call(WindowedValue<T> t) { + + @Override public T call(WindowedValue<T> t) { return t.getValue(); } }; } + + /** + * Checks if the window transformation should be applied or skipped. + * + * <p>Avoid running assign windows if both source and destination are global window or if the user + * has not specified the WindowFn (meaning they are just messing with triggering or allowed + * lateness). + * + */ + @SuppressWarnings("unchecked") public static <T, W extends BoundedWindow> boolean skipAssignWindows( + Window.Assign<T> transform, TranslationContext context) { + WindowFn<? super T, W> windowFnToApply = (WindowFn<? super T, W>) transform.getWindowFn(); + PCollection<T> input = (PCollection<T>) context.getInput(); + WindowFn<?, ?> windowFnOfInput = input.getWindowingStrategy().getWindowFn(); + return windowFnToApply == null || (windowFnOfInput instanceof GlobalWindows + && windowFnToApply instanceof GlobalWindows); + } + + public static <T, W extends BoundedWindow> MapFunction<WindowedValue<T>, WindowedValue<T>> assignWindowsMapFunction( + WindowFn<T, W> windowFn) { + return new MapFunction<WindowedValue<T>, WindowedValue<T>>() { + + @Override public WindowedValue<T> call(WindowedValue<T> windowedValue) throws Exception { + final BoundedWindow boundedWindow = Iterables.getOnlyElement(windowedValue.getWindows()); + final T element = windowedValue.getValue(); + final Instant timestamp = windowedValue.getTimestamp(); + Collection<W> windows = windowFn.assignWindows(windowFn.new AssignContext() { + + @Override public T element() { + return element; + } + + @Override public Instant timestamp() { + return timestamp; + } + + @Override public BoundedWindow window() { + return boundedWindow; + } + }); + return WindowedValue.of(element, timestamp, windows, windowedValue.getPane()); + } + }; + } } +
