Replaces Window.Bound with simply Window
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6848950c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6848950c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6848950c Branch: refs/heads/master Commit: 6848950cca5bee2dddc18ddca229a5deb9e34754 Parents: 6b67e54 Author: Eugene Kirpichov <[email protected]> Authored: Wed Mar 29 13:09:49 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Mar 31 10:59:38 2017 -0700 ---------------------------------------------------------------------- .../translation/WindowAssignTranslator.java | 2 +- .../direct/WindowEvaluatorFactoryTest.java | 7 +- .../beam/runners/dataflow/AssignWindows.java | 4 +- .../dataflow/ReshuffleOverrideFactory.java | 2 +- .../org/apache/beam/sdk/testing/PAssert.java | 4 +- .../org/apache/beam/sdk/transforms/DoFn.java | 2 +- .../apache/beam/sdk/transforms/GroupByKey.java | 6 +- .../beam/sdk/transforms/WithTimestamps.java | 6 +- .../beam/sdk/transforms/windowing/Never.java | 2 +- .../beam/sdk/transforms/windowing/PaneInfo.java | 2 +- .../beam/sdk/transforms/windowing/Window.java | 447 +++++++++---------- .../org/apache/beam/sdk/util/Reshuffle.java | 2 +- .../java/org/apache/beam/sdk/io/WriteTest.java | 4 +- .../org/apache/beam/sdk/transforms/TopTest.java | 3 +- .../sdk/transforms/windowing/WindowTest.java | 17 +- 15 files changed, 249 insertions(+), 261 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java index b3aef8d..6106f75 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowAssignTranslator.java @@ -35,7 +35,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; /** - * {@link Window.Bound} is translated to {link ApexParDoOperator} that wraps an {@link + * {@link Window} is translated to {link ApexParDoOperator} that wraps an {@link * AssignWindowsDoFn}. */ class WindowAssignTranslator<T> implements TransformTranslator<Window.Assign<T>> { http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index a71a75b..eb58629 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -43,7 +43,6 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.Window.Bound; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.WindowedValue; @@ -113,7 +112,7 @@ public class WindowEvaluatorFactoryTest { @Test public void singleWindowFnSucceeds() throws Exception { Duration windowDuration = Duration.standardDays(7); - Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration)); + Window<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration)); PCollection<Long> windowed = input.apply(transform); CommittedBundle<Long> inputBundle = createInputBundle(); @@ -152,7 +151,7 @@ public class WindowEvaluatorFactoryTest { public void multipleWindowsWindowFnSucceeds() throws Exception { Duration windowDuration = Duration.standardDays(6); Duration slidingBy = Duration.standardDays(3); - Bound<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy)); + Window<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy)); PCollection<Long> windowed = input.apply(transform); CommittedBundle<Long> inputBundle = createInputBundle(); @@ -209,7 +208,7 @@ public class WindowEvaluatorFactoryTest { @Test public void referencesEarlierWindowsSucceeds() throws Exception { - Bound<Long> transform = Window.into(new EvaluatorTestWindowFn()); + Window<Long> transform = Window.into(new EvaluatorTestWindowFn()); PCollection<Long> windowed = input.apply(transform); CommittedBundle<Long> inputBundle = createInputBundle(); http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java index 880cd26..3e36899 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/AssignWindows.java @@ -42,13 +42,13 @@ import org.apache.beam.sdk.values.PCollection; * @param <T> the type of input element */ class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final Window.Bound<T> transform; + private final Window<T> transform; /** * Builds an instance of this class from the overriden transform. */ @SuppressWarnings("unused") // Used via reflection - public AssignWindows(Window.Bound<T> transform) { + public AssignWindows(Window<T> transform) { this.transform = transform; } http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java index 230f5dc..2e6455d 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/ReshuffleOverrideFactory.java @@ -56,7 +56,7 @@ class ReshuffleOverrideFactory<K, V> // If the input has already had its windows merged, then the GBK that performed the merge // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged. - Window.Bound<KV<K, V>> rewindow = + Window<KV<K, V>> rewindow = Window.<KV<K, V>>into( new IdentityWindowFn<>( originalStrategy.getWindowFn().windowCoder())) http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index ab412c4..92dca53 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -145,7 +145,7 @@ public class PAssert { * <p>If the input {@link WindowingStrategy} does not always produce final panes, the assertion * may be executed over an empty input even if the trigger has fired previously. To ensure that * a final pane is always produced, set the {@link ClosingBehavior} of the windowing strategy - * (via {@link Window.Bound#withAllowedLateness(Duration, ClosingBehavior)} setting + * (via {@link Window#withAllowedLateness(Duration, ClosingBehavior)} setting * {@link ClosingBehavior} to {@link ClosingBehavior#FIRE_ALWAYS}). * * @return a new {@link IterableAssert} like this one but with the assertion only applied to the @@ -233,7 +233,7 @@ public class PAssert { * <p>If the input {@link WindowingStrategy} does not always produce final panes, the assertion * may be executed over an empty input even if the trigger has fired previously. To ensure that * a final pane is always produced, set the {@link ClosingBehavior} of the windowing strategy - * (via {@link Window.Bound#withAllowedLateness(Duration, ClosingBehavior)} setting + * (via {@link Window#withAllowedLateness(Duration, ClosingBehavior)} setting * {@link ClosingBehavior} to {@link ClosingBehavior#FIRE_ALWAYS}). * * @return a new {@link SingletonAssert} like this one but with the assertion only applied to http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index e43527a..de33612 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -324,7 +324,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * * @deprecated This method permits a {@link DoFn} to emit elements behind the watermark. These * elements are considered late, and if behind the - * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream * {@link PCollection} may be silently dropped. See * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. * http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java index f68b1f3..d228dbb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java @@ -39,7 +39,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * each distinct key and window of the input {@code PCollection} to an * {@code Iterable} over all the values associated with that key in * the input per window. Absent repeatedly-firing - * {@link Window.Bound#triggering triggering}, each key in the output + * {@link Window#triggering triggering}, each key in the output * {@code PCollection} is unique within each window. * * <p>{@code GroupByKey} is analogous to converting a multi-map into @@ -97,14 +97,14 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; * for details on the estimation. * * <p>The timestamp for each emitted pane is determined by the - * {@link Window.Bound#withOutputTimeFn windowing operation}. + * {@link Window#withOutputTimeFn windowing operation}. * The output {@code PCollection} will have the same {@link WindowFn} * as the input. * * <p>If the input {@code PCollection} contains late data (see * {@link org.apache.beam.sdk.io.PubsubIO.Read#timestampLabel} * for an example of how this can occur) or the - * {@link Window.Bound#triggering requested TriggerFn} can fire before + * {@link Window#triggering requested TriggerFn} can fire before * the watermark, then there may be multiple elements * output by a {@code GroupByKey} that correspond to the same key and window. * http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index 39cf6c6..c4df2fb 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -52,7 +52,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * * <p>CAUTION: Use of {@link #withAllowedTimestampSkew(Duration)} permits elements to be emitted * behind the watermark. These elements are considered late, and if behind the {@link - * Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream + * Window#withAllowedLateness(Duration) allowed lateness} of a downstream * {@link PCollection} may be silently dropped. See https://issues.apache.org/jira/browse/BEAM-644 * for details on a replacement. * @@ -90,7 +90,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * future. For infinite skew, use {@code new Duration(Long.MAX_VALUE)}. * @deprecated This method permits a to elements to be emitted behind the watermark. These * elements are considered late, and if behind the - * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream * {@link PCollection} may be silently dropped. See * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. */ @@ -106,7 +106,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> * @see DoFn#getAllowedTimestampSkew() * @deprecated This method permits a to elements to be emitted behind the watermark. These * elements are considered late, and if behind the - * {@link Window.Bound#withAllowedLateness(Duration) allowed lateness} of a downstream + * {@link Window#withAllowedLateness(Duration) allowed lateness} of a downstream * {@link PCollection} may be silently dropped. See * https://issues.apache.org/jira/browse/BEAM-644 for details on a replacement. */ http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index d8cb96d..664ae83 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -26,7 +26,7 @@ import org.joda.time.Instant; * A trigger which never fires. * * <p>Using this trigger will only produce output when the watermark passes the end of the - * {@link BoundedWindow window} plus the {@link Window.Bound#withAllowedLateness allowed + * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness allowed * lateness}. */ public final class Never { http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java index de5b1e1..1fa1f49 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PaneInfo.java @@ -72,7 +72,7 @@ public final class PaneInfo { * <li>We'll call a pipeline 'simple' if it does not use * {@link DoFn.Context#outputWithTimestamp} in * any {@link DoFn}, and it uses the same - * {@link org.apache.beam.sdk.transforms.windowing.Window.Bound#withAllowedLateness} + * {@link org.apache.beam.sdk.transforms.windowing.Window#withAllowedLateness} * argument value on all windows (or uses the default of {@link org.joda.time.Duration#ZERO}). * <li>We'll call an element 'locally late', from the point of view of a computation on a * worker, if the element's timestamp is before the input watermark for that computation http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 188554a..58425e0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -89,7 +89,7 @@ import org.joda.time.Duration; * * <h2>Triggers</h2> * - * <p>{@link Window.Bound#triggering(Trigger)} allows specifying a trigger to control when + * <p>{@link Window#triggering(Trigger)} allows specifying a trigger to control when * (in processing time) results for the given window can be produced. If unspecified, the default * behavior is to trigger first when the watermark passes the end of the window, and then trigger * again every time there is late arriving data. @@ -139,8 +139,8 @@ import org.joda.time.Duration; * * <p>See {@link Trigger} for details on the available triggers. */ -public class Window { - +@AutoValue +public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T>> { /** * Specifies the conditions under which a final pane will be created when a window is permanently * closed. @@ -169,7 +169,7 @@ public class Window { * the argument {@code WindowFn}. It is ready to be applied, or further * properties can be set on it first. */ - public static <T> Bound<T> into(WindowFn<? super T, ?> fn) { + public static <T> Window<T> into(WindowFn<? super T, ?> fn) { try { fn.windowCoder().verifyDeterministic(); } catch (NonDeterministicException e) { @@ -182,265 +182,256 @@ public class Window { * Returns a new builder for a {@link Window} transform for setting windowing parameters other * than the windowing function. */ - public static <T> Bound<T> configure() { - return new AutoValue_Window_Bound.Builder<T>().build(); + public static <T> Window<T> configure() { + return new AutoValue_Window.Builder<T>().build(); + } + + @Nullable + public abstract WindowFn<? super T, ?> getWindowFn(); + + @Nullable abstract Trigger getTrigger(); + @Nullable abstract AccumulationMode getAccumulationMode(); + @Nullable abstract Duration getAllowedLateness(); + @Nullable abstract ClosingBehavior getClosingBehavior(); + @Nullable abstract OutputTimeFn<?> getOutputTimeFn(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setWindowFn(WindowFn<? super T, ?> windowFn); + abstract Builder<T> setTrigger(Trigger trigger); + abstract Builder<T> setAccumulationMode(AccumulationMode mode); + abstract Builder<T> setAllowedLateness(Duration allowedLateness); + abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior); + abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn); + + abstract Window<T> build(); + } + + private Window<T> withWindowFn(WindowFn<? super T, ?> windowFn) { + return toBuilder().setWindowFn(windowFn).build(); } /** - * A {@code PTransform} that windows the elements of a {@code PCollection<T>}, - * into finite windows according to a user-specified {@code WindowFn}. + * Sets a non-default trigger for this {@code Window} {@code PTransform}. + * Elements that are assigned to a specific window will be output when + * the trigger fires. * - * @param <T> The type of elements this {@code Window} is applied to + * <p>{@link org.apache.beam.sdk.transforms.windowing.Trigger} + * has more details on the available triggers. + * + * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation + * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. */ - @AutoValue - public abstract static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> { - @Nullable - public abstract WindowFn<? super T, ?> getWindowFn(); - - @Nullable abstract Trigger getTrigger(); - @Nullable abstract AccumulationMode getAccumulationMode(); - @Nullable abstract Duration getAllowedLateness(); - @Nullable abstract ClosingBehavior getClosingBehavior(); - @Nullable abstract OutputTimeFn<?> getOutputTimeFn(); - - abstract Builder<T> toBuilder(); - - @AutoValue.Builder - abstract static class Builder<T> { - abstract Builder<T> setWindowFn(WindowFn<? super T, ?> windowFn); - abstract Builder<T> setTrigger(Trigger trigger); - abstract Builder<T> setAccumulationMode(AccumulationMode mode); - abstract Builder<T> setAllowedLateness(Duration allowedLateness); - abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior); - abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn); - - abstract Bound<T> build(); - } + @Experimental(Kind.TRIGGER) + public Window<T> triggering(Trigger trigger) { + return toBuilder().setTrigger(trigger).build(); + } - private Bound<T> withWindowFn(WindowFn<? super T, ?> windowFn) { - return toBuilder().setWindowFn(windowFn).build(); - } + /** + * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and + * Triggering behavior, and that discards elements in a pane after they are triggered. + * + * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently + * specified to be applied, but more properties can still be specified. + */ + @Experimental(Kind.TRIGGER) + public Window<T> discardingFiredPanes() { + return toBuilder().setAccumulationMode(AccumulationMode.DISCARDING_FIRED_PANES).build(); + } + + /** + * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and + * Triggering behavior, and that accumulates elements in a pane after they are triggered. + * + * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently + * specified to be applied, but more properties can still be specified. + */ + @Experimental(Kind.TRIGGER) + public Window<T> accumulatingFiredPanes() { + return toBuilder().setAccumulationMode(AccumulationMode.ACCUMULATING_FIRED_PANES).build(); + } - /** - * Sets a non-default trigger for this {@code Window} {@code PTransform}. - * Elements that are assigned to a specific window will be output when - * the trigger fires. - * - * <p>{@link org.apache.beam.sdk.transforms.windowing.Trigger} - * has more details on the available triggers. - * - * <p>Must also specify allowed lateness using {@link #withAllowedLateness} and accumulation - * mode using either {@link #discardingFiredPanes()} or {@link #accumulatingFiredPanes()}. - */ - @Experimental(Kind.TRIGGER) - public Bound<T> triggering(Trigger trigger) { - return toBuilder().setTrigger(trigger).build(); - } + /** + * Override the amount of lateness allowed for data elements in the output {@link PCollection} + * and downstream {@link PCollection PCollections} until explicitly set again. + * Like the other properties on this {@link Window} operation, this will be applied at + * the next {@link GroupByKey}. Any elements that are later than this as decided by + * the system-maintained watermark will be dropped. + * + * <p>This value also determines how long state will be kept around for old windows. + * Once no elements will be added to a window (because this duration has passed) any state + * associated with the window will be cleaned up. + * + * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See + * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details. + */ + @Experimental(Kind.TRIGGER) + public Window<T> withAllowedLateness(Duration allowedLateness) { + return toBuilder().setAllowedLateness(allowedLateness).build(); + } - /** - * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and - * Triggering behavior, and that discards elements in a pane after they are triggered. - * - * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently - * specified to be applied, but more properties can still be specified. - */ - @Experimental(Kind.TRIGGER) - public Bound<T> discardingFiredPanes() { - return toBuilder().setAccumulationMode(AccumulationMode.DISCARDING_FIRED_PANES).build(); - } - - /** - * Returns a new {@code Window} {@code PTransform} that uses the registered WindowFn and - * Triggering behavior, and that accumulates elements in a pane after they are triggered. - * - * <p>Does not modify this transform. The resulting {@code PTransform} is sufficiently - * specified to be applied, but more properties can still be specified. - */ - @Experimental(Kind.TRIGGER) - public Bound<T> accumulatingFiredPanes() { - return toBuilder().setAccumulationMode(AccumulationMode.ACCUMULATING_FIRED_PANES).build(); - } + /** + * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control + * the output timestamp of values output from a {@link GroupByKey} operation. + */ + @Experimental(Kind.OUTPUT_TIME) + public Window<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { + return toBuilder().setOutputTimeFn(outputTimeFn).build(); + } - /** - * Override the amount of lateness allowed for data elements in the output {@link PCollection} - * and downstream {@link PCollection PCollections} until explicitly set again. - * Like the other properties on this {@link Window} operation, this will be applied at - * the next {@link GroupByKey}. Any elements that are later than this as decided by - * the system-maintained watermark will be dropped. - * - * <p>This value also determines how long state will be kept around for old windows. - * Once no elements will be added to a window (because this duration has passed) any state - * associated with the window will be cleaned up. - * - * <p>Depending on the trigger this may not produce a pane with {@link PaneInfo#isLast}. See - * {@link ClosingBehavior#FIRE_IF_NON_EMPTY} for more details. - */ - @Experimental(Kind.TRIGGER) - public Bound<T> withAllowedLateness(Duration allowedLateness) { - return toBuilder().setAllowedLateness(allowedLateness).build(); - } + /** + * Override the amount of lateness allowed for data elements in the pipeline. Like + * the other properties on this {@link Window} operation, this will be applied at + * the next {@link GroupByKey}. Any elements that are later than this as decided by + * the system-maintained watermark will be dropped. + * + * <p>This value also determines how long state will be kept around for old windows. + * Once no elements will be added to a window (because this duration has passed) any state + * associated with the window will be cleaned up. + */ + @Experimental(Kind.TRIGGER) + public Window<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) { + return toBuilder().setAllowedLateness(allowedLateness).setClosingBehavior(behavior).build(); + } - /** - * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control - * the output timestamp of values output from a {@link GroupByKey} operation. - */ - @Experimental(Kind.OUTPUT_TIME) - public Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { - return toBuilder().setOutputTimeFn(outputTimeFn).build(); + /** + * Get the output strategy of this {@link Window Window PTransform}. For internal use + * only. + */ + // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is + // casting between wildcards + public WindowingStrategy<?, ?> getOutputStrategyInternal( + WindowingStrategy<?, ?> inputStrategy) { + WindowingStrategy<?, ?> result = inputStrategy; + if (getWindowFn() != null) { + result = result.withWindowFn(getWindowFn()); } - - /** - * Override the amount of lateness allowed for data elements in the pipeline. Like - * the other properties on this {@link Window} operation, this will be applied at - * the next {@link GroupByKey}. Any elements that are later than this as decided by - * the system-maintained watermark will be dropped. - * - * <p>This value also determines how long state will be kept around for old windows. - * Once no elements will be added to a window (because this duration has passed) any state - * associated with the window will be cleaned up. - */ - @Experimental(Kind.TRIGGER) - public Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) { - return toBuilder().setAllowedLateness(allowedLateness).setClosingBehavior(behavior).build(); + if (getTrigger() != null) { + result = result.withTrigger(getTrigger()); } - - /** - * Get the output strategy of this {@link Window.Bound Window PTransform}. For internal use - * only. - */ - // Rawtype cast of OutputTimeFn cannot be eliminated with intermediate variable, as it is - // casting between wildcards - public WindowingStrategy<?, ?> getOutputStrategyInternal( - WindowingStrategy<?, ?> inputStrategy) { - WindowingStrategy<?, ?> result = inputStrategy; - if (getWindowFn() != null) { - result = result.withWindowFn(getWindowFn()); - } - if (getTrigger() != null) { - result = result.withTrigger(getTrigger()); - } - if (getAccumulationMode() != null) { - result = result.withMode(getAccumulationMode()); - } - if (getAllowedLateness() != null) { - result = result.withAllowedLateness(getAllowedLateness()); - } - if (getClosingBehavior() != null) { - result = result.withClosingBehavior(getClosingBehavior()); - } - if (getOutputTimeFn() != null) { - result = result.withOutputTimeFn(getOutputTimeFn()); - } - return result; + if (getAccumulationMode() != null) { + result = result.withMode(getAccumulationMode()); } - - @Override - public void validate(PCollection<T> input) { - WindowingStrategy<?, ?> outputStrategy = - getOutputStrategyInternal(input.getWindowingStrategy()); - - // Make sure that the windowing strategy is complete & valid. - if (outputStrategy.isTriggerSpecified() - && !(outputStrategy.getTrigger() instanceof DefaultTrigger) - && !(outputStrategy.getWindowFn() instanceof GlobalWindows) - && !outputStrategy.isAllowedLatenessSpecified()) { - throw new IllegalArgumentException( - "Except when using GlobalWindows," - + " calling .triggering() to specify a trigger requires that the allowed lateness" - + " be specified using .withAllowedLateness() to set the upper bound on how late" - + " data can arrive before being dropped. See Javadoc for more details."); - } - - if (!outputStrategy.isModeSpecified() && canProduceMultiplePanes(outputStrategy)) { - throw new IllegalArgumentException( - "Calling .triggering() to specify a trigger or calling .withAllowedLateness() to" - + " specify an allowed lateness greater than zero requires that the accumulation" - + " mode be specified using .discardingFiredPanes() or .accumulatingFiredPanes()." - + " See Javadoc for more details."); - } + if (getAllowedLateness() != null) { + result = result.withAllowedLateness(getAllowedLateness()); + } + if (getClosingBehavior() != null) { + result = result.withClosingBehavior(getClosingBehavior()); + } + if (getOutputTimeFn() != null) { + result = result.withOutputTimeFn(getOutputTimeFn()); } + return result; + } - private boolean canProduceMultiplePanes(WindowingStrategy<?, ?> strategy) { - // The default trigger is Repeatedly.forever(AfterWatermark.pastEndOfWindow()); This fires - // for every late-arriving element if allowed lateness is nonzero, and thus we must have - // an accumulating mode specified - boolean dataCanArriveLate = - !(strategy.getWindowFn() instanceof GlobalWindows) - && strategy.getAllowedLateness().getMillis() > 0; - boolean hasCustomTrigger = !(strategy.getTrigger() instanceof DefaultTrigger); - return dataCanArriveLate || hasCustomTrigger; + @Override + public void validate(PCollection<T> input) { + WindowingStrategy<?, ?> outputStrategy = + getOutputStrategyInternal(input.getWindowingStrategy()); + + // Make sure that the windowing strategy is complete & valid. + if (outputStrategy.isTriggerSpecified() + && !(outputStrategy.getTrigger() instanceof DefaultTrigger) + && !(outputStrategy.getWindowFn() instanceof GlobalWindows) + && !outputStrategy.isAllowedLatenessSpecified()) { + throw new IllegalArgumentException( + "Except when using GlobalWindows," + + " calling .triggering() to specify a trigger requires that the allowed lateness" + + " be specified using .withAllowedLateness() to set the upper bound on how late" + + " data can arrive before being dropped. See Javadoc for more details."); } - @Override - public PCollection<T> expand(PCollection<T> input) { - WindowingStrategy<?, ?> outputStrategy = - getOutputStrategyInternal(input.getWindowingStrategy()); - if (getWindowFn() == null) { - // A new PCollection must be created in case input is reused in a different location as the - // two PCollections will, in general, have a different windowing strategy. - return PCollectionList.of(input) - .apply(Flatten.<T>pCollections()) - .setWindowingStrategyInternal(outputStrategy); - } else { - // This is the AssignWindows primitive - return input.apply(new Assign<>(this, outputStrategy)); - } + if (!outputStrategy.isModeSpecified() && canProduceMultiplePanes(outputStrategy)) { + throw new IllegalArgumentException( + "Calling .triggering() to specify a trigger or calling .withAllowedLateness() to" + + " specify an allowed lateness greater than zero requires that the accumulation" + + " mode be specified using .discardingFiredPanes() or .accumulatingFiredPanes()." + + " See Javadoc for more details."); } + } - @Override - public void populateDisplayData(DisplayData.Builder builder) { - super.populateDisplayData(builder); + private boolean canProduceMultiplePanes(WindowingStrategy<?, ?> strategy) { + // The default trigger is Repeatedly.forever(AfterWatermark.pastEndOfWindow()); This fires + // for every late-arriving element if allowed lateness is nonzero, and thus we must have + // an accumulating mode specified + boolean dataCanArriveLate = + !(strategy.getWindowFn() instanceof GlobalWindows) + && strategy.getAllowedLateness().getMillis() > 0; + boolean hasCustomTrigger = !(strategy.getTrigger() instanceof DefaultTrigger); + return dataCanArriveLate || hasCustomTrigger; + } - if (getWindowFn() != null) { - builder - .add(DisplayData.item("windowFn", getWindowFn().getClass()) - .withLabel("Windowing Function")) - .include("windowFn", getWindowFn()); - } + @Override + public PCollection<T> expand(PCollection<T> input) { + WindowingStrategy<?, ?> outputStrategy = + getOutputStrategyInternal(input.getWindowingStrategy()); + if (getWindowFn() == null) { + // A new PCollection must be created in case input is reused in a different location as the + // two PCollections will, in general, have a different windowing strategy. + return PCollectionList.of(input) + .apply(Flatten.<T>pCollections()) + .setWindowingStrategyInternal(outputStrategy); + } else { + // This is the AssignWindows primitive + return input.apply(new Assign<>(this, outputStrategy)); + } + } - if (getAllowedLateness() != null) { - builder.addIfNotDefault(DisplayData.item("allowedLateness", getAllowedLateness()) - .withLabel("Allowed Lateness"), - Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); - if (getTrigger() != null && !(getTrigger() instanceof DefaultTrigger)) { - builder.add(DisplayData.item("trigger", getTrigger().toString()) - .withLabel("Trigger")); - } + if (getWindowFn() != null) { + builder + .add(DisplayData.item("windowFn", getWindowFn().getClass()) + .withLabel("Windowing Function")) + .include("windowFn", getWindowFn()); + } - if (getAccumulationMode() != null) { - builder.add(DisplayData.item("accumulationMode", getAccumulationMode().toString()) - .withLabel("Accumulation Mode")); - } + if (getAllowedLateness() != null) { + builder.addIfNotDefault(DisplayData.item("allowedLateness", getAllowedLateness()) + .withLabel("Allowed Lateness"), + Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); + } - if (getClosingBehavior() != null) { - builder.add(DisplayData.item("closingBehavior", getClosingBehavior().toString()) - .withLabel("Window Closing Behavior")); - } + if (getTrigger() != null && !(getTrigger() instanceof DefaultTrigger)) { + builder.add(DisplayData.item("trigger", getTrigger().toString()) + .withLabel("Trigger")); + } - if (getOutputTimeFn() != null) { - builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass()) - .withLabel("Output Time Function")); - } + if (getAccumulationMode() != null) { + builder.add(DisplayData.item("accumulationMode", getAccumulationMode().toString()) + .withLabel("Accumulation Mode")); } - @Override - protected Coder<?> getDefaultOutputCoder(PCollection<T> input) { - return input.getCoder(); + if (getClosingBehavior() != null) { + builder.add(DisplayData.item("closingBehavior", getClosingBehavior().toString()) + .withLabel("Window Closing Behavior")); } - @Override - protected String getKindString() { - return "Window.Into()"; + if (getOutputTimeFn() != null) { + builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass()) + .withLabel("Output Time Function")); } } + @Override + protected Coder<?> getDefaultOutputCoder(PCollection<T> input) { + return input.getCoder(); + } + + @Override + protected String getKindString() { + return "Window.Into()"; + } + /** * A Primitive {@link PTransform} that assigns windows to elements based on a {@link WindowFn}. */ public static class Assign<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final Bound<T> original; + private final Window<T> original; private final WindowingStrategy<T, ?> updatedStrategy; /** @@ -449,7 +440,7 @@ public class Window { * {@link #getWindowFn()}. */ @VisibleForTesting - Assign(Bound<T> original, WindowingStrategy updatedStrategy) { + Assign(Window<T> original, WindowingStrategy updatedStrategy) { this.original = original; this.updatedStrategy = updatedStrategy; } http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index e80bc17..0c27c4f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -59,7 +59,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged. // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards in time. // Because this outputs as fast as possible, this should not hold the watermark. - Window.Bound<KV<K, V>> rewindow = + Window<KV<K, V>> rewindow = Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder())) .triggering(new ReshuffleTrigger<>()) .discardingFiredPanes() http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 80f6f66..3ecbed4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -118,8 +118,8 @@ public class WriteTest { }; private static class WindowAndReshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> { - private final Window.Bound<T> window; - public WindowAndReshuffle(Window.Bound<T> window) { + private final Window<T> window; + public WindowAndReshuffle(Window<T> window) { this.window = window; } http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java index e1ac54f..9b0b27d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/TopTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.Window.Bound; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.hamcrest.Matchers; @@ -153,7 +152,7 @@ public class TopTest { public void testTopEmptyWithIncompatibleWindows() { p.enableAbandonedNodeEnforcement(false); - Bound<String> windowingFn = Window.<String>into(FixedWindows.of(Duration.standardDays(10L))); + Window<String> windowingFn = Window.<String>into(FixedWindows.of(Duration.standardDays(10L))); PCollection<String> input = p.apply(Create.empty(StringUtf8Coder.of())).apply(windowingFn); expectedEx.expect(IllegalStateException.class); http://git-wip-us.apache.org/repos/asf/beam/blob/6848950c/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 979179d..8bf022b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -52,7 +52,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; -import org.apache.beam.sdk.transforms.windowing.Window.Bound; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.KV; @@ -168,7 +167,7 @@ public class WindowTest implements Serializable { /** * With {@link #testWindowIntoNullWindowFnNoAssign()}, demonstrates that the expansions of the - * {@link Window.Bound} transform depends on if it actually assigns elements to windows. + * {@link Window} transform depends on if it actually assigns elements to windows. */ @Test public void testWindowIntoWindowFnAssign() { @@ -192,7 +191,7 @@ public class WindowTest implements Serializable { /** * With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the - * {@link Window.Bound} transform depends on if it actually assigns elements to windows. + * {@link Window} transform depends on if it actually assigns elements to windows. */ @Test public void testWindowIntoNullWindowFnNoAssign() { @@ -429,7 +428,7 @@ public class WindowTest implements Serializable { Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); - Window.Bound<?> window = Window + Window<?> window = Window .into(windowFn) .triggering(triggerBuilder) .accumulatingFiredPanes() @@ -459,7 +458,7 @@ public class WindowTest implements Serializable { Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); - Window.Bound<?> window = Window + Window<?> window = Window .into(windowFn) .triggering(triggerBuilder) .accumulatingFiredPanes() @@ -486,7 +485,7 @@ public class WindowTest implements Serializable { public void testAssignDisplayDataUnchanged() { FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5)); - Bound<Object> original = Window.into(windowFn); + Window<Object> original = Window.into(windowFn); WindowingStrategy<?, ?> updated = WindowingStrategy.globalDefault().withWindowFn(windowFn); DisplayData displayData = DisplayData.from(new Window.Assign<>(original, updated)); @@ -503,7 +502,7 @@ public class WindowTest implements Serializable { @Test public void testDisplayDataExcludesUnspecifiedProperties() { - Window.Bound<?> onlyHasAccumulationMode = Window.<Object>configure().discardingFiredPanes(); + Window<?> onlyHasAccumulationMode = Window.<Object>configure().discardingFiredPanes(); assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf( "windowFn", "trigger", @@ -511,14 +510,14 @@ public class WindowTest implements Serializable { "allowedLateness", "closingBehavior"))))); - Window.Bound<?> noAccumulationMode = Window.into(new GlobalWindows()); + Window<?> noAccumulationMode = Window.into(new GlobalWindows()); assertThat(DisplayData.from(noAccumulationMode), not(hasDisplayItem(hasKey("accumulationMode")))); } @Test public void testDisplayDataExcludesDefaults() { - Window.Bound<?> window = Window.into(new GlobalWindows()) + Window<?> window = Window.into(new GlobalWindows()) .triggering(DefaultTrigger.of()) .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
