Add WindowMappingFn to PCollectionView This exposes the explicit way in which Windows should be mapped.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/73133587 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/73133587 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/73133587 Branch: refs/heads/master Commit: 73133587217a6a8d1f55a254a59af39409000b31 Parents: e32a025 Author: Thomas Groh <[email protected]> Authored: Wed Apr 5 09:24:36 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Wed Apr 5 11:00:44 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/PCollectionViews.java | 41 +++++++++++++++--- .../apache/beam/sdk/values/PCollectionView.java | 10 +++++ .../sdk/testing/PCollectionViewTesting.java | 45 ++++++++++++++++++-- 3 files changed, 86 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java index c2e3153..7617253 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/PCollectionViews.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.InvalidWindows; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; @@ -64,9 +65,10 @@ public class PCollectionViews { boolean hasDefault, @Nullable T defaultValue, Coder<T> valueCoder) { - return new SimplePCollectionView<>( + return new SimplePCollectionView<>( pCollection, new SingletonViewFn<>(hasDefault, defaultValue, valueCoder), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), windowingStrategy, valueCoder); } @@ -80,7 +82,11 @@ public class PCollectionViews { WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { return new SimplePCollectionView<>( - pCollection, new IterableViewFn<T>(), windowingStrategy, valueCoder); + pCollection, + new IterableViewFn<T>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); } /** @@ -91,8 +97,12 @@ public class PCollectionViews { PCollection<T> pCollection, WindowingStrategy<?, W> windowingStrategy, Coder<T> valueCoder) { - return new SimplePCollectionView<>( - pCollection, new ListViewFn<T>(), windowingStrategy, valueCoder); + return new SimplePCollectionView<>( + pCollection, + new ListViewFn<T>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); } /** @@ -104,7 +114,11 @@ public class PCollectionViews { WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { return new SimplePCollectionView<>( - pCollection, new MapViewFn<K, V>(), windowingStrategy, valueCoder); + pCollection, + new MapViewFn<K, V>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); } /** @@ -116,7 +130,11 @@ public class PCollectionViews { WindowingStrategy<?, W> windowingStrategy, Coder<KV<K, V>> valueCoder) { return new SimplePCollectionView<>( - pCollection, new MultimapViewFn<K, V>(), windowingStrategy, valueCoder); + pCollection, + new MultimapViewFn<K, V>(), + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + windowingStrategy, + valueCoder); } /** @@ -308,6 +326,8 @@ public class PCollectionViews { /** A unique tag for the view, typed according to the elements underlying the view. */ private TupleTag<Iterable<WindowedValue<ElemT>>> tag; + private WindowMappingFn<W> windowMappingFn; + /** The windowing strategy for the PCollection underlying the view. */ private WindowingStrategy<?, W> windowingStrategy; @@ -327,6 +347,7 @@ public class PCollectionViews { PCollection<ElemT> pCollection, TupleTag<Iterable<WindowedValue<ElemT>>> tag, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + WindowMappingFn<W> windowMappingFn, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { super(pCollection.getPipeline()); @@ -334,6 +355,7 @@ public class PCollectionViews { if (windowingStrategy.getWindowFn() instanceof InvalidWindows) { throw new IllegalArgumentException("WindowFn of PCollectionView cannot be InvalidWindows"); } + this.windowMappingFn = windowMappingFn; this.tag = tag; this.windowingStrategy = windowingStrategy; this.viewFn = viewFn; @@ -349,12 +371,14 @@ public class PCollectionViews { private SimplePCollectionView( PCollection<ElemT> pCollection, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + WindowMappingFn<W> windowMappingFn, WindowingStrategy<?, W> windowingStrategy, Coder<ElemT> valueCoder) { this( pCollection, new TupleTag<Iterable<WindowedValue<ElemT>>>(), viewFn, + windowMappingFn, windowingStrategy, valueCoder); } @@ -378,6 +402,11 @@ public class PCollectionViews { } @Override + public WindowMappingFn<?> getWindowMappingFn() { + return windowMappingFn; + } + + @Override public PCollection<?> getPCollection() { return pCollection; } http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java index f2ddf55..d65912b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionView.java @@ -19,10 +19,13 @@ package org.apache.beam.sdk.values; import java.io.Serializable; import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.ViewFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; @@ -69,6 +72,13 @@ public interface PCollectionView<T> extends PValue, Serializable { ViewFn<Iterable<WindowedValue<?>>, T> getViewFn(); /** + * Returns the {@link WindowMappingFn} used to map windows from a main input to the side input of + * this {@link PCollectionView}. + */ + @Experimental(Kind.CORE_RUNNERS_ONLY) + WindowMappingFn<?> getWindowMappingFn(); + + /** * @deprecated this method will be removed entirely. The {@link PCollection} underlying a side * input, including its {@link WindowingStrategy}, is part of the side input's specification * with a {@link ParDo} transform, which will obtain that information via a package-private http://git-wip-us.apache.org/repos/asf/beam/blob/73133587/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java index b544812..e6b13c1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/PCollectionViewTesting.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; @@ -155,12 +156,38 @@ public final class PCollectionViewTesting { } /** - * A {@link PCollectionView} explicitly built from its {@link TupleTag}, - * {@link WindowingStrategy}, {@link Coder}, and conversion function. + * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link + * WindowingStrategy}, {@link Coder}, and conversion function. * * <p>This method is only recommended for use by runner implementors to test their - * implementations. It is very easy to construct a {@link PCollectionView} that does - * not respect the invariants required for proper functioning. + * implementations. It is very easy to construct a {@link PCollectionView} that does not respect + * the invariants required for proper functioning. + * + * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed + * values provided to the view during execution, results are unpredictable. + */ + public static <ElemT, ViewT> PCollectionView<ViewT> testingView( + PCollection<ElemT> pCollection, + TupleTag<Iterable<WindowedValue<ElemT>>> tag, + ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + Coder<ElemT> elemCoder, + WindowingStrategy<?, ?> windowingStrategy) { + return testingView( + pCollection, + tag, + viewFn, + windowingStrategy.getWindowFn().getDefaultWindowMappingFn(), + elemCoder, + windowingStrategy); + } + + /** + * A {@link PCollectionView} explicitly built from its {@link TupleTag}, {@link + * WindowingStrategy}, {@link Coder}, {@link ViewFn} and {@link WindowMappingFn}. + * + * <p>This method is only recommended for use by runner implementors to test their + * implementations. It is very easy to construct a {@link PCollectionView} that does not respect + * the invariants required for proper functioning. * * <p>Note that if the provided {@code WindowingStrategy} does not match that of the windowed * values provided to the view during execution, results are unpredictable. @@ -169,12 +196,14 @@ public final class PCollectionViewTesting { PCollection<ElemT> pCollection, TupleTag<Iterable<WindowedValue<ElemT>>> tag, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + WindowMappingFn<?> windowMappingFn, Coder<ElemT> elemCoder, WindowingStrategy<?, ?> windowingStrategy) { return new PCollectionViewFromParts<>( pCollection, tag, viewFn, + windowMappingFn, windowingStrategy, IterableCoder.of( WindowedValue.getFullCoder(elemCoder, windowingStrategy.getWindowFn().windowCoder()))); @@ -226,6 +255,7 @@ public final class PCollectionViewTesting { private PCollection<ElemT> pCollection; private TupleTag<Iterable<WindowedValue<ElemT>>> tag; private ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn; + private WindowMappingFn<?> windowMappingFn; private WindowingStrategy<?, ?> windowingStrategy; private Coder<Iterable<WindowedValue<ElemT>>> coder; @@ -233,11 +263,13 @@ public final class PCollectionViewTesting { PCollection<ElemT> pCollection, TupleTag<Iterable<WindowedValue<ElemT>>> tag, ViewFn<Iterable<WindowedValue<ElemT>>, ViewT> viewFn, + WindowMappingFn<?> windowMappingFn, WindowingStrategy<?, ?> windowingStrategy, Coder<Iterable<WindowedValue<ElemT>>> coder) { this.pCollection = pCollection; this.tag = tag; this.viewFn = viewFn; + this.windowMappingFn = windowMappingFn; this.windowingStrategy = windowingStrategy; this.coder = coder; } @@ -262,6 +294,11 @@ public final class PCollectionViewTesting { } @Override + public WindowMappingFn<?> getWindowMappingFn() { + return windowMappingFn; + } + + @Override public WindowingStrategy<?, ?> getWindowingStrategyInternal() { return windowingStrategy; }
