Remove references to multi-window representation from model Some areas of the Beam model in the SDK allude to the use of a compressed representation of an element along with the set of windows it is assigned to. However, the model itself views elements in different windows as fully independent, so the SDK should not place any obligation on the part of the runner or user to use a particular representation.
This change removes those places in the SDK where an element is treated in multiple windows at once. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/08104410 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/08104410 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/08104410 Branch: refs/heads/master Commit: 08104410177063b1095bd91b24b40f9961c92cf2 Parents: a3aa4c7 Author: Kenneth Knowles <[email protected]> Authored: Mon May 9 12:17:09 2016 -0700 Committer: Kenneth Knowles <[email protected]> Committed: Thu Jun 23 09:35:44 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/AssignWindowsDoFn.java | 15 ++++--- .../apache/beam/sdk/util/DoFnRunnerBase.java | 2 +- .../beam/sdk/util/ReduceFnRunnerTest.java | 3 +- .../apache/beam/sdk/util/ReduceFnTester.java | 46 +++++++++++--------- .../runners/direct/WindowEvaluatorFactory.java | 6 ++- .../direct/WindowEvaluatorFactoryTest.java | 4 +- .../FlinkStreamingTransformTranslators.java | 5 ++- .../functions/FlinkAssignContext.java | 15 ++++++- .../functions/FlinkNoElementAssignContext.java | 4 +- .../streaming/FlinkAbstractParDoWrapper.java | 4 +- .../flink/streaming/GroupAlsoByWindowTest.java | 2 +- .../beam/sdk/testing/WindowFnTestUtils.java | 5 ++- .../sdk/transforms/windowing/GlobalWindows.java | 5 --- .../windowing/PartitioningWindowFn.java | 5 --- .../beam/sdk/transforms/windowing/WindowFn.java | 11 +---- .../apache/beam/sdk/util/GatherAllPanes.java | 3 +- .../apache/beam/sdk/util/IdentityWindowFn.java | 20 +++------ .../org/apache/beam/sdk/util/Reshuffle.java | 3 +- .../sdk/util/IdentitySideInputWindowFn.java | 3 +- .../sdk/util/MergingActiveWindowSetTest.java | 6 +-- .../org/apache/beam/sdk/util/TriggerTester.java | 14 +++--- 21 files changed, 89 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java index caec40e..d40b007 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java @@ -20,22 +20,27 @@ package org.apache.beam.sdk.util; import static com.google.common.base.Preconditions.checkNotNull; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import com.google.common.collect.Iterables; + import org.joda.time.Instant; import java.util.Collection; /** - * {@link DoFn} that tags elements of a PCollection with windows, according - * to the provided {@link WindowFn}. + * {@link DoFn} that tags elements of a {@link PCollection} with windows, according to the provided + * {@link WindowFn}. + * * @param <T> Type of elements being windowed * @param <W> Window type */ @SystemDoFnInternal -public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> { +public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> + implements RequiresWindowAccess { private WindowFn<? super T, W> fn; public AssignWindowsDoFn(WindowFn<? super T, W> fn) { @@ -64,8 +69,8 @@ public class AssignWindowsDoFn<T, W extends BoundedWindow> extends DoFn<T, T> { } @Override - public Collection<? extends BoundedWindow> windows() { - return c.windowingInternals().windows(); + public BoundedWindow window() { + return Iterables.getOnlyElement(c.windowingInternals().windows()); } }); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java index 1ebe72b..a849eb2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java @@ -254,7 +254,7 @@ public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<Inpu } @Override - public Collection<? extends BoundedWindow> windows() { + public W window() { throw new UnsupportedOperationException( "WindowFn attempted to access input windows when none were available"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index b7ec540..64fcae3 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -82,7 +82,6 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -258,7 +257,7 @@ public class ReduceFnRunnerTest { } @Override - public Collection<? extends BoundedWindow> windows() { + public BoundedWindow window() { throw new UnsupportedOperationException(); } })); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java index 9916c5c..e897f54 100644 --- a/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java @@ -401,21 +401,25 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { WindowTracing.trace("TriggerTester.injectElements: {}", value); } ReduceFnRunner<String, InputT, OutputT, W> runner = createRunner(); - runner.processElements(Iterables.transform( - Arrays.asList(values), new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() { - @Override - public WindowedValue<InputT> apply(TimestampedValue<InputT> input) { - try { - InputT value = input.getValue(); - Instant timestamp = input.getTimestamp(); - Collection<W> windows = windowFn.assignWindows(new TestAssignContext<W>( - windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE))); - return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - })); + runner.processElements( + Iterables.transform( + Arrays.asList(values), + new Function<TimestampedValue<InputT>, WindowedValue<InputT>>() { + @Override + public WindowedValue<InputT> apply(TimestampedValue<InputT> input) { + try { + InputT value = input.getValue(); + Instant timestamp = input.getTimestamp(); + Collection<W> windows = + windowFn.assignWindows( + new TestAssignContext<W>( + windowFn, value, timestamp, GlobalWindow.INSTANCE)); + return WindowedValue.of(value, timestamp, windows, PaneInfo.NO_FIRING); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + })); // Persist after each bundle. runner.persist(); @@ -538,14 +542,14 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { extends WindowFn<Object, W>.AssignContext { private Object element; private Instant timestamp; - private Collection<? extends BoundedWindow> windows; + private BoundedWindow window; - public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant timestamp, - Collection<? extends BoundedWindow> windows) { + public TestAssignContext( + WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) { windowFn.super(); this.element = element; this.timestamp = timestamp; - this.windows = windows; + this.window = window; } @Override @@ -559,8 +563,8 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { } @Override - public Collection<? extends BoundedWindow> windows() { - return windows; + public BoundedWindow window() { + return window; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 6045912..67c2f17 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -29,6 +29,8 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; +import com.google.common.collect.Iterables; + import org.joda.time.Instant; import java.util.Collection; @@ -125,8 +127,8 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public Collection<? extends BoundedWindow> windows() { - return value.getWindows(); + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index c5faa5a..65dcfeb 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -326,11 +326,11 @@ public class WindowEvaluatorFactoryTest { private static class EvaluatorTestWindowFn extends NonMergingWindowFn<Long, BoundedWindow> { @Override public Collection<BoundedWindow> assignWindows(AssignContext c) throws Exception { - if (c.windows().contains(GlobalWindow.INSTANCE)) { + if (c.window().equals(GlobalWindow.INSTANCE)) { return Collections.<BoundedWindow>singleton(new IntervalWindow(c.timestamp(), c.timestamp().plus(1L))); } - return (Collection<BoundedWindow>) c.windows(); + return Collections.singleton(c.window()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index b3fed99..5d04068 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import com.google.api.client.util.Maps; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.flink.api.common.functions.FilterFunction; @@ -359,8 +360,8 @@ public class FlinkStreamingTransformTranslators { } @Override - public Collection<? extends BoundedWindow> windows() { - return c.windowingInternals().windows(); + public BoundedWindow window() { + return Iterables.getOnlyElement(c.windowingInternals().windows()); } }); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java index 7ea8c20..6abb8ff 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java @@ -17,10 +17,14 @@ */ package org.apache.beam.runners.flink.translation.functions; +import static com.google.common.base.Preconditions.checkArgument; + import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; +import com.google.common.collect.Iterables; + import org.joda.time.Instant; import java.util.Collection; @@ -35,6 +39,13 @@ class FlinkAssignContext<InputT, W extends BoundedWindow> FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { fn.super(); + checkArgument( + Iterables.size(value.getWindows()) == 1, + String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); this.value = value; } @@ -49,8 +60,8 @@ class FlinkAssignContext<InputT, W extends BoundedWindow> } @Override - public Collection<? extends BoundedWindow> windows() { - return value.getWindows(); + public BoundedWindow window() { + return Iterables.getOnlyElement(value.getWindows()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java index 892f7a1..d49821b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java @@ -65,7 +65,7 @@ class FlinkNoElementAssignContext<InputT, W extends BoundedWindow> } @Override - public Collection<? extends BoundedWindow> windows() { - throw new UnsupportedOperationException("No windows available."); + public BoundedWindow window() { + throw new UnsupportedOperationException("No window available."); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java index 3c37aa9..f68a519 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java @@ -245,9 +245,9 @@ public abstract class FlinkAbstractParDoWrapper<IN, OUTDF, OUTFL> extends RichFl } @Override - public Collection<? extends BoundedWindow> windows() { + public BoundedWindow window() { throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); + "WindowFn attempted to access input window when none was available"); } }); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java index 3e5a17d..207fb5a 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java @@ -508,7 +508,7 @@ public class GroupAlsoByWindowTest extends StreamingMultipleProgramsTestBase { } @Override - public Collection<? extends BoundedWindow> windows() { + public BoundedWindow window() { throw new UnsupportedOperationException( "WindowFn attempted to access input windows when none were available"); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java index a4130df..127721a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; @@ -115,8 +116,8 @@ public class WindowFnTestUtils { } @Override - public Collection<? extends BoundedWindow> windows() { - return null; + public BoundedWindow window() { + return GlobalWindow.INSTANCE; } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index 499ffeb..002bf2e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -53,11 +53,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> { } @Override - public boolean assignsToSingleWindow() { - return true; - } - - @Override public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) { return inputTimestamp; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java index b0dd8b9..da2f38c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java @@ -51,11 +51,6 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow> } @Override - public boolean assignsToSingleWindow() { - return true; - } - - @Override public Instant getOutputTime(Instant inputTimestamp, W window) { return inputTimestamp; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 41833f8..d84866b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -65,10 +65,10 @@ public abstract class WindowFn<T, W extends BoundedWindow> public abstract Instant timestamp(); /** - * Returns the windows the current element was in, prior to this + * Returns the window of the current element prior to this * {@code WindowFn} being called. */ - public abstract Collection<? extends BoundedWindow> windows(); + public abstract BoundedWindow window(); } /** @@ -161,13 +161,6 @@ public abstract class WindowFn<T, W extends BoundedWindow> } /** - * Returns true if this {@code WindowFn} assigns each element to a single window. - */ - public boolean assignsToSingleWindow() { - return false; - } - - /** * {@inheritDoc} * * <p>By default, does not register any display data. Implementors may override this method http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java index 5a01c28..ab40678 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GatherAllPanes.java @@ -62,8 +62,7 @@ public class GatherAllPanes<T> .apply( Window.into( new IdentityWindowFn<KV<Void, WindowedValue<T>>>( - originalWindowFn.windowCoder(), - input.getWindowingStrategy().getWindowFn().assignsToSingleWindow())) + originalWindowFn.windowCoder())) .triggering(Never.ever())) // all values have the same key so they all appear as a single output element .apply(GroupByKey.<Void, WindowedValue<T>>create()) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java index 91e5609..a3477e9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.values.PCollection; import org.joda.time.Instant; import java.util.Collection; +import java.util.Collections; /** * A {@link WindowFn} that leaves all associations between elements and windows unchanged. @@ -55,25 +56,21 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { * these windows. */ private final Coder<BoundedWindow> coder; - private final boolean assignsToSingleWindow; - public IdentityWindowFn(Coder<? extends BoundedWindow> coder, boolean assignsToSingleWindow) { + public IdentityWindowFn(Coder<? extends BoundedWindow> coder) { // Safe because it is only used privately here. // At every point where a window is returned or accepted, it has been provided - // by priorWindowFn, so it is of the expected type. + // by the prior WindowFn, so it is of the expected type. @SuppressWarnings("unchecked") Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) coder; this.coder = windowCoder; - this.assignsToSingleWindow = assignsToSingleWindow; } @Override public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext c) throws Exception { - // The windows are provided by priorWindowFn, which also provides the coder for them - @SuppressWarnings("unchecked") - Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows(); - return priorWindows; + // The window is provided by the prior WindowFn, which also provides the coder for them + return Collections.singleton(c.window()); } @Override @@ -88,17 +85,12 @@ class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> { @Override public Coder<BoundedWindow> windowCoder() { - // Safe because the previous WindowFn provides both the windows and the coder. + // Safe because the prior WindowFn provides both the windows and the coder. // The Coder is _not_ actually a coder for an arbitrary BoundedWindow. return coder; } @Override - public boolean assignsToSingleWindow() { - return assignsToSingleWindow; - } - - @Override public BoundedWindow getSideInputWindow(BoundedWindow window) { throw new UnsupportedOperationException( String.format( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index 5c91326..c0d159b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -58,8 +58,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti Window.Bound<KV<K, V>> rewindow = Window.<KV<K, V>>into( new IdentityWindowFn<>( - originalStrategy.getWindowFn().windowCoder(), - originalStrategy.getWindowFn().assignsToSingleWindow())) + originalStrategy.getWindowFn().windowCoder())) .triggering(new ReshuffleTrigger<>()) .discardingFiredPanes() .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java index db6f425..705003e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn; import java.util.Collection; +import java.util.Collections; /** * A {@link WindowFn} for use during tests that returns the input window for calls to @@ -33,7 +34,7 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, Bound @Override public Collection<BoundedWindow> assignWindows(WindowFn<Integer, BoundedWindow>.AssignContext c) throws Exception { - return (Collection<BoundedWindow>) c.windows(); + return Collections.singleton(c.window()); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java index 84699d6..4750af1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/MergingActiveWindowSetTest.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.util.state.InMemoryStateInternals; @@ -39,7 +40,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -87,8 +87,8 @@ public class MergingActiveWindowSetTest { } @Override - public Collection<? extends BoundedWindow> windows() { - return ImmutableList.of(); + public BoundedWindow window() { + return GlobalWindow.INSTANCE; } }; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/08104410/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index a1e376e..c495712 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -245,7 +245,7 @@ public class TriggerTester<InputT, W extends BoundedWindow> { InputT value = input.getValue(); Instant timestamp = input.getTimestamp(); Collection<W> assignedWindows = windowFn.assignWindows(new TestAssignContext<W>( - windowFn, value, timestamp, Arrays.asList(GlobalWindow.INSTANCE))); + windowFn, value, timestamp, GlobalWindow.INSTANCE)); for (W window : assignedWindows) { activeWindows.addActiveForTesting(window); @@ -401,14 +401,14 @@ public class TriggerTester<InputT, W extends BoundedWindow> { extends WindowFn<Object, W>.AssignContext { private Object element; private Instant timestamp; - private Collection<? extends BoundedWindow> windows; + private BoundedWindow window; - public TestAssignContext(WindowFn<Object, W> windowFn, Object element, Instant timestamp, - Collection<? extends BoundedWindow> windows) { + public TestAssignContext( + WindowFn<Object, W> windowFn, Object element, Instant timestamp, BoundedWindow window) { windowFn.super(); this.element = element; this.timestamp = timestamp; - this.windows = windows; + this.window = window; } @Override @@ -422,8 +422,8 @@ public class TriggerTester<InputT, W extends BoundedWindow> { } @Override - public Collection<? extends BoundedWindow> windows() { - return windows; + public BoundedWindow window() { + return window; } }
