Replace valueInEmptyWindows with valueInGlobalWindow in Spark Function, and add per-value (non-RDD) windowing functions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d852c5b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d852c5b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d852c5b9 Branch: refs/heads/master Commit: d852c5b9391a7dc6d9eea21f8a4e0905d2cd7b28 Parents: 5fab1c5 Author: Sela <[email protected]> Authored: Thu Apr 14 22:02:20 2016 +0300 Committer: Sela <[email protected]> Committed: Tue Apr 19 22:06:15 2016 +0300 ---------------------------------------------------------------------- .../spark/translation/WindowingHelpers.java | 38 +++++++++++++++++--- 1 file changed, 34 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d852c5b9/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java index e92b6d1..ec94f3e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/WindowingHelpers.java @@ -29,8 +29,8 @@ public final class WindowingHelpers { } /** - * A function for converting a value to a {@link WindowedValue}. The resulting - * {@link WindowedValue} will be in no windows, and will have the default timestamp + * A Spark function for converting a value to a {@link WindowedValue}. The resulting + * {@link WindowedValue} will be in a global windows, and will have the default timestamp == MIN * and pane. * * @param <T> The type of the object. @@ -40,13 +40,13 @@ public final class WindowingHelpers { return new Function<T, WindowedValue<T>>() { @Override public WindowedValue<T> call(T t) { - return WindowedValue.valueInEmptyWindows(t); + return WindowedValue.valueInGlobalWindow(t); } }; } /** - * A function for extracting the value from a {@link WindowedValue}. + * A Spark function for extracting the value from a {@link WindowedValue}. * * @param <T> The type of the object. * @return A function that accepts a {@link WindowedValue} and returns its value. @@ -59,4 +59,34 @@ public final class WindowingHelpers { } }; } + + /** + * Same as windowFunction but for non-RDD values - not an RDD transformation! + * + * @param <T> The type of the object. + * @return A function that accepts an object and returns its {@link WindowedValue}. + */ + public static <T> com.google.common.base.Function<T, WindowedValue<T>> windowValueFunction() { + return new com.google.common.base.Function<T, WindowedValue<T>>() { + @Override + public WindowedValue<T> apply(T t) { + return WindowedValue.valueInGlobalWindow(t); + } + }; + } + + /** + * Same as unwindowFunction but for non-RDD values - not an RDD transformation! + * + * @param <T> The type of the object. + * @return A function that accepts an object and returns its {@link WindowedValue}. + */ + public static <T> com.google.common.base.Function<WindowedValue<T>, T> unwindowValueFunction() { + return new com.google.common.base.Function<WindowedValue<T>, T>() { + @Override + public T apply(WindowedValue<T> t) { + return t.getValue(); + } + }; + } }
