Replaced static Window.blah() methods with Window.configure().blah() except Window.into()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6b67e547 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6b67e547 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6b67e547 Branch: refs/heads/master Commit: 6b67e547aab7658bcb6dfdf6eb5bf7e220ef7558 Parents: 876d13d Author: Eugene Kirpichov <[email protected]> Authored: Wed Mar 29 12:58:20 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Mar 31 10:59:38 2017 -0700 ---------------------------------------------------------------------- .../direct/ParDoMultiOverrideFactory.java | 4 +- .../translation/streaming/CreateStreamTest.java | 4 +- .../org/apache/beam/sdk/testing/PAssert.java | 6 +- .../org/apache/beam/sdk/transforms/DoFn.java | 8 +- .../apache/beam/sdk/transforms/GroupByKey.java | 4 +- .../beam/sdk/transforms/WithTimestamps.java | 10 +-- .../beam/sdk/transforms/windowing/Never.java | 2 +- .../beam/sdk/transforms/windowing/Window.java | 80 +++----------------- .../apache/beam/sdk/testing/TestStreamTest.java | 6 +- .../sdk/transforms/windowing/WindowTest.java | 24 +++--- 10 files changed, 47 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 4604fcc..056a0c3 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -135,8 +135,8 @@ class ParDoMultiOverrideFactory<InputT, OutputT> // - ensure this GBK holds to the minimum of those timestamps (via OutputTimeFn) // - discard past panes as it is "just a stream" of elements .apply( - Window.<KV<K, WindowedValue<KV<K, InputT>>>>triggering( - Repeatedly.forever(AfterPane.elementCountAtLeast(1))) + Window.<KV<K, WindowedValue<KV<K, InputT>>>>configure() + .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .discardingFiredPanes() .withAllowedLateness(inputWindowingStrategy.getAllowedLateness()) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java index 75abc8b..78b8039 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java @@ -275,7 +275,7 @@ public class CreateStreamTest implements Serializable { PCollection<String> createStrings = p.apply("CreateStrings", source) .apply("WindowStrings", - Window.<String>triggering(AfterPane.elementCountAtLeast(2)) + Window.<String>configure().triggering(AfterPane.elementCountAtLeast(2)) .withAllowedLateness(Duration.ZERO) .accumulatingFiredPanes()); PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); @@ -283,7 +283,7 @@ public class CreateStreamTest implements Serializable { PCollection<Integer> createInts = p.apply("CreateInts", other) .apply("WindowInts", - Window.<Integer>triggering(AfterPane.elementCountAtLeast(4)) + Window.<Integer>configure().triggering(AfterPane.elementCountAtLeast(4)) .withAllowedLateness(Duration.ZERO) .accumulatingFiredPanes()); PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index 56df449..ab412c4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -939,7 +939,8 @@ public class PAssert { PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>, PCollection<KV<Integer, Iterable<ValueInSingleWindow<T>>>>> removeTriggering = - Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever()) + Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure() + .triggering(Never.ever()) .discardingFiredPanes() .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()); // Group the contents by key. If it is empty, this PCollection will be empty, too. @@ -983,7 +984,8 @@ public class PAssert { Flatten.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>pCollections()) .apply( "NeverTrigger", - Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>triggering(Never.ever()) + Window.<KV<Integer, Iterable<ValueInSingleWindow<T>>>>configure() + .triggering(Never.ever()) .withAllowedLateness(input.getWindowingStrategy().getAllowedLateness()) .discardingFiredPanes()) .apply( http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index a7730f0..e43527a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -279,8 +279,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD /** * Returns the timestamp of the input element. * - * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. + * <p>See {@link Window} for more information. */ public abstract Instant timestamp(); @@ -290,8 +289,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * * <p>Generally all data is in a single, uninteresting pane unless custom * triggering and/or late data has been explicitly requested. - * See {@link org.apache.beam.sdk.transforms.windowing.Window} - * for more information. + * See {@link Window} for more information. */ public abstract PaneInfo pane(); } @@ -326,7 +324,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These * elements are considered late, and if behind the - * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream * {@link PCollection} may be silently dropped. See * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. * http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index adf189b..f68b1f3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * each distinct key and window of the input {@code PCollection} to an * {@code Iterable} over all the values associated with that key in * the input per window. Absent repeatedly-firing - * {@link Window#triggering triggering}, each key in the output + * {@link Window.Bound#triggering triggering}, each key in the output * {@code PCollection} is unique within each window. * * <p>{@code GroupByKey} is analogous to converting a multi-map into @@ -104,7 +104,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * <p>If the input {@code PCollection} contains late data (see * {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel} * for an example of how this can occur) or the - * {@link Window#triggering requested TriggerFn} can fire before + * {@link Window.Bound#triggering requested TriggerFn} can fire before * the watermark, then there may be multiple elements * output by a {@code GroupByKey} that correspond to the same key and window. * http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index 6f20226..39cf6c6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -52,9 +52,9 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * * <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted * behind the watermark. These elements are considered late, and if behind the {@link - * Window#withAllowedLateness(Duration) allowed lateness} of a downstream {@link PCollection} may - * be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 for details on a - * replacement. + * Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link PCollection} may be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 + * for details on a replacement. * * <p>Each output element will be in the same windows as the input element. If a new window based * on the new output timestamp is desired, apply a new instance of {@link Window#into(WindowFn)}. @@ -90,7 +90,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}. * @deprecated This method permits a to elements to be emitted behind the watermark. These * elements are considered late, and if behind the - * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream * {@link PCollection} may be silently dropped. See * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. */ @@ -106,7 +106,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * @see DoFn#getAllowedTimestampSkew() * @deprecated This method permits a to elements to be emitted behind the watermark. These * elements are considered late, and if behind the - * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream * {@link PCollection} may be silently dropped. See * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. */ http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 664ae83..d8cb96d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -26,7 +26,7 @@ import org.joda.time.Instant; * A trigger which never fires. * * <p>Using this trigger will only produce output when the watermark passes the end of the - * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed + * {@link BoundedWindow window} plus the {@link Window.Bound#withAllowedLateness allowed * lateness}. */ public final class Never { http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index a6c7adf..188554a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -170,62 +170,18 @@ public class Window { * properties can be set on it first. */ public static <T> Bound<T> into(WindowFn<? super T, ?> fn) { - return Window.<T>configure().into(fn); - } - - /** - * Sets a non-default trigger for this {@code Window} {@code PTransform}. - * Elements that are assigned to a specific window will be output when - * the trigger fires. - * - * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation - * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. - */ - @Experimental(Kind.TRIGGER) - public static <T> Bound<T> triggering(Trigger trigger) { - return Window.<T>configure().triggering(trigger); - } - - /** - * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and - * Triggering behavior, and that discards elements in a pane after they are triggered. - * - * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently - * specified to be applied, but more properties can still be specified. - */ - @Experimental(Kind.TRIGGER) - public static <T> Bound<T> discardingFiredPanes() { - return Window.<T>configure().discardingFiredPanes(); - } - - /** - * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and - * Triggering behavior, and that accumulates elements in a pane after they are triggered. - * - * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently - * specified to be applied, but more properties can still be specified. - */ - @Experimental(Kind.TRIGGER) - public static <T> Bound<T> accumulatingFiredPanes() { - return Window.<T>configure().accumulatingFiredPanes(); + try { + fn.windowCoder().verifyDeterministic(); + } catch (NonDeterministicException e) { + throw new IllegalArgumentException("Window coders must be deterministic.", e); + } + return Window.<T>configure().withWindowFn(fn); } /** - * Override the amount of lateness allowed for data elements in the output {@link PCollection}, - * and downstream {@link PCollection PCollections} until explicitly set again. Like - * the other properties on this {@link Window} operation, this will be applied at - * the next {@link GroupByKey}. Any elements that are later than this as decided by - * the system-maintained watermark will be dropped. - * - * <p>This value also determines how long state will be kept around for old windows. - * Once no elements will be added to a window (because this duration has passed) any state - * associated with the window will be cleaned up. + * Returns a new builder for a {@link Window} transform for setting windowing parameters other + * than the windowing function. */ - @Experimental(Kind.TRIGGER) - public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) { - return Window.<T>configure().withAllowedLateness(allowedLateness); - } - public static <T> Bound<T> configure() { return new AutoValue_Window_Bound.Builder<T>().build(); } @@ -261,20 +217,7 @@ public class Window { abstract Bound<T> build(); } - /** - * Returns a new {@code Window} {@code PTransform} that's like this - * transform but that will use the given {@link WindowFn}, and that has - * its input and output types bound. Does not modify this transform. The - * resulting {@code PTransform} is sufficiently specified to be applied, - * but more properties can still be specified. - */ - private Bound<T> into(WindowFn<? super T, ?> windowFn) { - try { - windowFn.windowCoder().verifyDeterministic(); - } catch (NonDeterministicException e) { - throw new IllegalArgumentException("Window coders must be deterministic.", e); - } - + private Bound<T> withWindowFn(WindowFn<? super T, ?> windowFn) { return toBuilder().setWindowFn(windowFn).build(); } @@ -319,8 +262,9 @@ public class Window { } /** - * Override the amount of lateness allowed for data elements in the pipeline. Like - * the other properties on this {@link Window} operation, this will be applied at + * Override the amount of lateness allowed for data elements in the output {@link PCollection} + * and downstream {@link PCollection PCollections} until explicitly set again. + * Like the other properties on this {@link Window} operation, this will be applied at * the next {@link GroupByKey}. Any elements that are later than this as decided by * the system-maintained watermark will be dropped. * http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java index 614831d..5cb7634 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java @@ -146,7 +146,7 @@ public class TestStreamTest implements Serializable { .advanceWatermarkToInfinity(); PCollection<Long> sum = p.apply(source) - .apply(Window.<Long>triggering(AfterWatermark.pastEndOfWindow() + .apply(Window.<Long>configure().triggering(AfterWatermark.pastEndOfWindow() .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(5)))).accumulatingFiredPanes() .withAllowedLateness(Duration.ZERO)) @@ -272,14 +272,14 @@ public class TestStreamTest implements Serializable { PCollection<String> createStrings = p.apply("CreateStrings", stream) .apply("WindowStrings", - Window.<String>triggering(AfterPane.elementCountAtLeast(2)) + Window.<String>configure().triggering(AfterPane.elementCountAtLeast(2)) .withAllowedLateness(Duration.ZERO) .accumulatingFiredPanes()); PAssert.that(createStrings).containsInAnyOrder("foo", "bar"); PCollection<Integer> createInts = p.apply("CreateInts", other) .apply("WindowInts", - Window.<Integer>triggering(AfterPane.elementCountAtLeast(4)) + Window.<Integer>configure().triggering(AfterPane.elementCountAtLeast(4)) .withAllowedLateness(Duration.ZERO) .accumulatingFiredPanes()); PAssert.that(createInts).containsInAnyOrder(1, 2, 3, 4); http://git-wip-us.apache.org/repos/asf/beam/blob/6b67e547/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 2bc8d86..979179d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -136,9 +136,9 @@ public class WindowTest implements Serializable { Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); WindowingStrategy<?, ?> strategy = pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) - .apply("Mode", Window.<String>accumulatingFiredPanes()) - .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1))) - .apply("Trigger", Window.<String>triggering(trigger)) + .apply("Mode", Window.<String>configure().accumulatingFiredPanes()) + .apply("Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1))) + .apply("Trigger", Window.<String>configure().triggering(trigger)) .apply("Window", Window.<String>into(fixed10)) .getWindowingStrategy(); @@ -199,7 +199,7 @@ public class WindowTest implements Serializable { pipeline .apply(Create.of(1, 2, 3)) .apply( - Window.<Integer>triggering(AfterWatermark.pastEndOfWindow()) + Window.<Integer>configure().triggering(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Duration.ZERO) .accumulatingFiredPanes()); @@ -247,7 +247,9 @@ public class WindowTest implements Serializable { thrown.expectMessage("requires that the accumulation mode"); input.apply( "Triggering", - Window.<String>withAllowedLateness(Duration.standardDays(1)).triggering(trigger)); + Window.<String>configure() + .withAllowedLateness(Duration.standardDays(1)) + .triggering(trigger)); } @Test @@ -260,8 +262,8 @@ public class WindowTest implements Serializable { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("allowed lateness"); thrown.expectMessage("accumulation mode be specified"); - input - .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1))); + input.apply( + "Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1))); } @Test @@ -273,9 +275,9 @@ public class WindowTest implements Serializable { thrown.expectMessage("requires that the allowed lateness"); pipeline .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) - .apply("Mode", Window.<String>accumulatingFiredPanes()) + .apply("Mode", Window.<String>configure().accumulatingFiredPanes()) .apply("Window", Window.<String>into(fixed10)) - .apply("Trigger", Window.<String>triggering(trigger)); + .apply("Trigger", Window.<String>configure().triggering(trigger)); } private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> { @@ -353,7 +355,7 @@ public class WindowTest implements Serializable { PCollection<Boolean> updatedTrigger = upOne.apply( "UpdateWindowingStrategy", - Window.<Boolean>triggering(Never.ever()) + Window.<Boolean>configure().triggering(Never.ever()) .withAllowedLateness(Duration.ZERO) .accumulatingFiredPanes()); pipeline.run(); @@ -501,7 +503,7 @@ public class WindowTest implements Serializable { @Test public void testDisplayDataExcludesUnspecifiedProperties() { - Window.Bound<?> onlyHasAccumulationMode = Window.discardingFiredPanes(); + Window.Bound<?> onlyHasAccumulationMode = Window.<Object>configure().discardingFiredPanes(); assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf( "windowFn", "trigger",
