Repository: beam Updated Branches: refs/heads/master 023e6ab94 -> 132d3c5f6
Uses AutoValue in Window Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/876d13dd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/876d13dd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/876d13dd Branch: refs/heads/master Commit: 876d13dd367909490bdd052d5c140a784dacff14 Parents: 3563c4b Author: Eugene Kirpichov <[email protected]> Authored: Tue Mar 28 18:14:30 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Fri Mar 31 10:59:37 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/windowing/Window.java | 165 +++++++------------ 1 file changed, 64 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/876d13dd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 5f5295d..a6c7adf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms.windowing; +import com.google.auto.value.AutoValue; import com.google.common.annotations.VisibleForTesting; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -169,7 +170,7 @@ public class Window { * properties can be set on it first. */ public static <T> Bound<T> into(WindowFn<? super T, ?> fn) { - return new Bound<T>().into(fn); + return Window.<T>configure().into(fn); } /** @@ -182,7 +183,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public static <T> Bound<T> triggering(Trigger trigger) { - return new Bound<T>().triggering(trigger); + return Window.<T>configure().triggering(trigger); } /** @@ -194,7 +195,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public static <T> Bound<T> discardingFiredPanes() { - return new Bound<T>().discardingFiredPanes(); + return Window.<T>configure().discardingFiredPanes(); } /** @@ -206,7 +207,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public static <T> Bound<T> accumulatingFiredPanes() { - return new Bound<T>().accumulatingFiredPanes(); + return Window.<T>configure().accumulatingFiredPanes(); } /** @@ -222,16 +223,11 @@ public class Window { */ @Experimental(Kind.TRIGGER) public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) { - return new Bound<T>().withAllowedLateness(allowedLateness); + return Window.<T>configure().withAllowedLateness(allowedLateness); } - /** - * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, to control - * the output timestamp of values output from a {@link GroupByKey} operation. - */ - @Experimental(Kind.OUTPUT_TIME) - public static <T> Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { - return new Bound<T>().withOutputTimeFn(outputTimeFn); + public static <T> Bound<T> configure() { + return new AutoValue_Window_Bound.Builder<T>().build(); } /** @@ -240,33 +236,29 @@ public class Window { * * @param <T> The type of elements this {@code Window} is applied to */ - public static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> { - - - @Nullable private final WindowFn<? super T, ?> windowFn; - @Nullable private final Trigger trigger; - @Nullable private final AccumulationMode mode; - @Nullable private final Duration allowedLateness; - @Nullable private final ClosingBehavior closingBehavior; - @Nullable private final OutputTimeFn<?> outputTimeFn; - - private Bound( - @Nullable WindowFn<? super T, ?> windowFn, - @Nullable Trigger trigger, - @Nullable AccumulationMode mode, - @Nullable Duration allowedLateness, - ClosingBehavior behavior, - @Nullable OutputTimeFn<?> outputTimeFn) { - this.windowFn = windowFn; - this.trigger = trigger; - this.mode = mode; - this.allowedLateness = allowedLateness; - this.closingBehavior = behavior; - this.outputTimeFn = outputTimeFn; - } - - private Bound() { - this(null, null, null, null, null, null); + @AutoValue + public abstract static class Bound<T> extends PTransform<PCollection<T>, PCollection<T>> { + @Nullable + public abstract WindowFn<? super T, ?> getWindowFn(); + + @Nullable abstract Trigger getTrigger(); + @Nullable abstract AccumulationMode getAccumulationMode(); + @Nullable abstract Duration getAllowedLateness(); + @Nullable abstract ClosingBehavior getClosingBehavior(); + @Nullable abstract OutputTimeFn<?> getOutputTimeFn(); + + abstract Builder<T> toBuilder(); + + @AutoValue.Builder + abstract static class Builder<T> { + abstract Builder<T> setWindowFn(WindowFn<? super T, ?> windowFn); + abstract Builder<T> setTrigger(Trigger trigger); + abstract Builder<T> setAccumulationMode(AccumulationMode mode); + abstract Builder<T> setAllowedLateness(Duration allowedLateness); + abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior); + abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn); + + abstract Bound<T> build(); } /** @@ -283,8 +275,7 @@ public class Window { throw new IllegalArgumentException("Window coders must be deterministic.", e); } - return new Bound<>( - windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn); + return toBuilder().setWindowFn(windowFn).build(); } /** @@ -300,13 +291,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public Bound<T> triggering(Trigger trigger) { - return new Bound<>( - windowFn, - trigger, - mode, - allowedLateness, - closingBehavior, - outputTimeFn); + return toBuilder().setTrigger(trigger).build(); } /** @@ -318,13 +303,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public Bound<T> discardingFiredPanes() { - return new Bound<>( - windowFn, - trigger, - AccumulationMode.DISCARDING_FIRED_PANES, - allowedLateness, - closingBehavior, - outputTimeFn); + return toBuilder().setAccumulationMode(AccumulationMode.DISCARDING_FIRED_PANES).build(); } /** @@ -336,13 +315,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public Bound<T> accumulatingFiredPanes() { - return new Bound<>( - windowFn, - trigger, - AccumulationMode.ACCUMULATING_FIRED_PANES, - allowedLateness, - closingBehavior, - outputTimeFn); + return toBuilder().setAccumulationMode(AccumulationMode.ACCUMULATING_FIRED_PANES).build(); } /** @@ -360,8 +333,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public Bound<T> withAllowedLateness(Duration allowedLateness) { - return new Bound<>( - windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn); + return toBuilder().setAllowedLateness(allowedLateness).build(); } /** @@ -370,8 +342,7 @@ public class Window { */ @Experimental(Kind.OUTPUT_TIME) public Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) { - return new Bound<>( - windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn); + return toBuilder().setOutputTimeFn(outputTimeFn).build(); } /** @@ -386,7 +357,7 @@ public class Window { */ @Experimental(Kind.TRIGGER) public Bound<T> withAllowedLateness(Duration allowedLateness, ClosingBehavior behavior) { - return new Bound<>(windowFn, trigger, mode, allowedLateness, behavior, outputTimeFn); + return toBuilder().setAllowedLateness(allowedLateness).setClosingBehavior(behavior).build(); } /** @@ -398,35 +369,27 @@ public class Window { public WindowingStrategy<?, ?> getOutputStrategyInternal( WindowingStrategy<?, ?> inputStrategy) { WindowingStrategy<?, ?> result = inputStrategy; - if (windowFn != null) { - result = result.withWindowFn(windowFn); + if (getWindowFn() != null) { + result = result.withWindowFn(getWindowFn()); } - if (trigger != null) { - result = result.withTrigger(trigger); + if (getTrigger() != null) { + result = result.withTrigger(getTrigger()); } - if (mode != null) { - result = result.withMode(mode); + if (getAccumulationMode() != null) { + result = result.withMode(getAccumulationMode()); } - if (allowedLateness != null) { - result = result.withAllowedLateness(allowedLateness); + if (getAllowedLateness() != null) { + result = result.withAllowedLateness(getAllowedLateness()); } - if (closingBehavior != null) { - result = result.withClosingBehavior(closingBehavior); + if (getClosingBehavior() != null) { + result = result.withClosingBehavior(getClosingBehavior()); } - if (outputTimeFn != null) { - result = result.withOutputTimeFn(outputTimeFn); + if (getOutputTimeFn() != null) { + result = result.withOutputTimeFn(getOutputTimeFn()); } return result; } - /** - * Get the {@link WindowFn} of this {@link Window.Bound Window PTransform}. - */ - @Nullable - public WindowFn<? super T, ?> getWindowFn() { - return windowFn; - } - @Override public void validate(PCollection<T> input) { WindowingStrategy<?, ?> outputStrategy = @@ -468,7 +431,7 @@ public class Window { public PCollection<T> expand(PCollection<T> input) { WindowingStrategy<?, ?> outputStrategy = getOutputStrategyInternal(input.getWindowingStrategy()); - if (windowFn == null) { + if (getWindowFn() == null) { // A new PCollection must be created in case input is reused in a different location as the // two PCollections will, in general, have a different windowing strategy. return PCollectionList.of(input) @@ -484,36 +447,36 @@ public class Window { public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); - if (windowFn != null) { + if (getWindowFn() != null) { builder - .add(DisplayData.item("windowFn", windowFn.getClass()) + .add(DisplayData.item("windowFn", getWindowFn().getClass()) .withLabel("Windowing Function")) - .include("windowFn", windowFn); + .include("windowFn", getWindowFn()); } - if (allowedLateness != null) { - builder.addIfNotDefault(DisplayData.item("allowedLateness", allowedLateness) + if (getAllowedLateness() != null) { + builder.addIfNotDefault(DisplayData.item("allowedLateness", getAllowedLateness()) .withLabel("Allowed Lateness"), Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); } - if (trigger != null && !(trigger instanceof DefaultTrigger)) { - builder.add(DisplayData.item("trigger", trigger.toString()) + if (getTrigger() != null && !(getTrigger() instanceof DefaultTrigger)) { + builder.add(DisplayData.item("trigger", getTrigger().toString()) .withLabel("Trigger")); } - if (mode != null) { - builder.add(DisplayData.item("accumulationMode", mode.toString()) + if (getAccumulationMode() != null) { + builder.add(DisplayData.item("accumulationMode", getAccumulationMode().toString()) .withLabel("Accumulation Mode")); } - if (closingBehavior != null) { - builder.add(DisplayData.item("closingBehavior", closingBehavior.toString()) + if (getClosingBehavior() != null) { + builder.add(DisplayData.item("closingBehavior", getClosingBehavior().toString()) .withLabel("Window Closing Behavior")); } - if (outputTimeFn != null) { - builder.add(DisplayData.item("outputTimeFn", outputTimeFn.getClass()) + if (getOutputTimeFn() != null) { + builder.add(DisplayData.item("outputTimeFn", getOutputTimeFn().getClass()) .withLabel("Output Time Function")); } }
