Repository: beam
Updated Branches:
  refs/heads/master 023e6ab94 -> 132d3c5f6


Uses AutoValue in Window


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/876d13dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/876d13dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/876d13dd

Branch: refs/heads/master
Commit: 876d13dd367909490bdd052d5c140a784dacff14
Parents: 3563c4b
Author: Eugene Kirpichov <[email protected]>
Authored: Tue Mar 28 18:14:30 2017 -0700
Committer: Eugene Kirpichov <[email protected]>
Committed: Fri Mar 31 10:59:37 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/windowing/Window.java   | 165 +++++++------------
 1 file changed, 64 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/876d13dd/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 5f5295d..a6c7adf 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.sdk.transforms.windowing;
 
+import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
@@ -169,7 +170,7 @@ public class Window {
    * properties can be set on it first.
    */
   public static <T> Bound<T> into(WindowFn<? super T, ?> fn) {
-    return new Bound<T>().into(fn);
+    return Window.<T>configure().into(fn);
   }
 
   /**
@@ -182,7 +183,7 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> triggering(Trigger trigger) {
-    return new Bound<T>().triggering(trigger);
+    return Window.<T>configure().triggering(trigger);
   }
 
   /**
@@ -194,7 +195,7 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> discardingFiredPanes() {
-    return new Bound<T>().discardingFiredPanes();
+    return Window.<T>configure().discardingFiredPanes();
   }
 
   /**
@@ -206,7 +207,7 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> accumulatingFiredPanes() {
-    return new Bound<T>().accumulatingFiredPanes();
+    return Window.<T>configure().accumulatingFiredPanes();
   }
 
   /**
@@ -222,16 +223,11 @@ public class Window {
    */
   @Experimental(Kind.TRIGGER)
   public static <T> Bound<T> withAllowedLateness(Duration allowedLateness) {
-    return new Bound<T>().withAllowedLateness(allowedLateness);
+    return Window.<T>configure().withAllowedLateness(allowedLateness);
   }
 
-  /**
-   * <b><i>(Experimental)</i></b> Override the default {@link OutputTimeFn}, 
to control
-   * the output timestamp of values output from a {@link GroupByKey} operation.
-   */
-  @Experimental(Kind.OUTPUT_TIME)
-  public static <T> Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
-    return new Bound<T>().withOutputTimeFn(outputTimeFn);
+  public static <T> Bound<T> configure() {
+    return new AutoValue_Window_Bound.Builder<T>().build();
   }
 
   /**
@@ -240,33 +236,29 @@ public class Window {
    *
    * @param <T> The type of elements this {@code Window} is applied to
    */
-  public static class Bound<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
-
-
-    @Nullable private final WindowFn<? super T, ?> windowFn;
-    @Nullable private final Trigger trigger;
-    @Nullable private final AccumulationMode mode;
-    @Nullable private final Duration allowedLateness;
-    @Nullable private final ClosingBehavior closingBehavior;
-    @Nullable private final OutputTimeFn<?> outputTimeFn;
-
-    private Bound(
-        @Nullable WindowFn<? super T, ?> windowFn,
-        @Nullable Trigger trigger,
-        @Nullable AccumulationMode mode,
-        @Nullable Duration allowedLateness,
-        ClosingBehavior behavior,
-        @Nullable OutputTimeFn<?> outputTimeFn) {
-      this.windowFn = windowFn;
-      this.trigger = trigger;
-      this.mode = mode;
-      this.allowedLateness = allowedLateness;
-      this.closingBehavior = behavior;
-      this.outputTimeFn = outputTimeFn;
-    }
-
-    private Bound() {
-      this(null, null, null, null, null, null);
+  @AutoValue
+  public abstract static class Bound<T> extends PTransform<PCollection<T>, 
PCollection<T>> {
+    @Nullable
+    public abstract WindowFn<? super T, ?> getWindowFn();
+
+    @Nullable abstract Trigger getTrigger();
+    @Nullable abstract AccumulationMode getAccumulationMode();
+    @Nullable abstract Duration getAllowedLateness();
+    @Nullable abstract ClosingBehavior getClosingBehavior();
+    @Nullable abstract OutputTimeFn<?> getOutputTimeFn();
+
+    abstract Builder<T> toBuilder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+      abstract Builder<T> setWindowFn(WindowFn<? super T, ?> windowFn);
+      abstract Builder<T> setTrigger(Trigger trigger);
+      abstract Builder<T> setAccumulationMode(AccumulationMode mode);
+      abstract Builder<T> setAllowedLateness(Duration allowedLateness);
+      abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
+      abstract Builder<T> setOutputTimeFn(OutputTimeFn<?> outputTimeFn);
+
+      abstract Bound<T> build();
     }
 
     /**
@@ -283,8 +275,7 @@ public class Window {
         throw new IllegalArgumentException("Window coders must be 
deterministic.", e);
       }
 
-      return new Bound<>(
-          windowFn, trigger, mode, allowedLateness, closingBehavior, 
outputTimeFn);
+      return toBuilder().setWindowFn(windowFn).build();
     }
 
     /**
@@ -300,13 +291,7 @@ public class Window {
      */
     @Experimental(Kind.TRIGGER)
     public Bound<T> triggering(Trigger trigger) {
-      return new Bound<>(
-          windowFn,
-          trigger,
-          mode,
-          allowedLateness,
-          closingBehavior,
-          outputTimeFn);
+      return toBuilder().setTrigger(trigger).build();
     }
 
    /**
@@ -318,13 +303,7 @@ public class Window {
     */
     @Experimental(Kind.TRIGGER)
    public Bound<T> discardingFiredPanes() {
-     return new Bound<>(
-         windowFn,
-         trigger,
-         AccumulationMode.DISCARDING_FIRED_PANES,
-         allowedLateness,
-         closingBehavior,
-         outputTimeFn);
+     return 
toBuilder().setAccumulationMode(AccumulationMode.DISCARDING_FIRED_PANES).build();
    }
 
    /**
@@ -336,13 +315,7 @@ public class Window {
     */
    @Experimental(Kind.TRIGGER)
    public Bound<T> accumulatingFiredPanes() {
-     return new Bound<>(
-         windowFn,
-         trigger,
-         AccumulationMode.ACCUMULATING_FIRED_PANES,
-         allowedLateness,
-         closingBehavior,
-         outputTimeFn);
+     return 
toBuilder().setAccumulationMode(AccumulationMode.ACCUMULATING_FIRED_PANES).build();
    }
 
     /**
@@ -360,8 +333,7 @@ public class Window {
      */
     @Experimental(Kind.TRIGGER)
     public Bound<T> withAllowedLateness(Duration allowedLateness) {
-      return new Bound<>(
-          windowFn, trigger, mode, allowedLateness, closingBehavior, 
outputTimeFn);
+      return toBuilder().setAllowedLateness(allowedLateness).build();
     }
 
     /**
@@ -370,8 +342,7 @@ public class Window {
      */
     @Experimental(Kind.OUTPUT_TIME)
     public Bound<T> withOutputTimeFn(OutputTimeFn<?> outputTimeFn) {
-      return new Bound<>(
-          windowFn, trigger, mode, allowedLateness, closingBehavior, 
outputTimeFn);
+      return toBuilder().setOutputTimeFn(outputTimeFn).build();
     }
 
     /**
@@ -386,7 +357,7 @@ public class Window {
      */
     @Experimental(Kind.TRIGGER)
     public Bound<T> withAllowedLateness(Duration allowedLateness, 
ClosingBehavior behavior) {
-      return new Bound<>(windowFn, trigger, mode, allowedLateness, behavior, 
outputTimeFn);
+      return 
toBuilder().setAllowedLateness(allowedLateness).setClosingBehavior(behavior).build();
     }
 
     /**
@@ -398,35 +369,27 @@ public class Window {
     public WindowingStrategy<?, ?> getOutputStrategyInternal(
         WindowingStrategy<?, ?> inputStrategy) {
       WindowingStrategy<?, ?> result = inputStrategy;
-      if (windowFn != null) {
-        result = result.withWindowFn(windowFn);
+      if (getWindowFn() != null) {
+        result = result.withWindowFn(getWindowFn());
       }
-      if (trigger != null) {
-        result = result.withTrigger(trigger);
+      if (getTrigger() != null) {
+        result = result.withTrigger(getTrigger());
       }
-      if (mode != null) {
-        result = result.withMode(mode);
+      if (getAccumulationMode() != null) {
+        result = result.withMode(getAccumulationMode());
       }
-      if (allowedLateness != null) {
-        result = result.withAllowedLateness(allowedLateness);
+      if (getAllowedLateness() != null) {
+        result = result.withAllowedLateness(getAllowedLateness());
       }
-      if (closingBehavior != null) {
-        result = result.withClosingBehavior(closingBehavior);
+      if (getClosingBehavior() != null) {
+        result = result.withClosingBehavior(getClosingBehavior());
       }
-      if (outputTimeFn != null) {
-        result = result.withOutputTimeFn(outputTimeFn);
+      if (getOutputTimeFn() != null) {
+        result = result.withOutputTimeFn(getOutputTimeFn());
       }
       return result;
     }
 
-    /**
-     * Get the {@link WindowFn} of this {@link Window.Bound Window PTransform}.
-     */
-    @Nullable
-    public WindowFn<? super T, ?> getWindowFn() {
-      return windowFn;
-    }
-
     @Override
     public void validate(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
@@ -468,7 +431,7 @@ public class Window {
     public PCollection<T> expand(PCollection<T> input) {
       WindowingStrategy<?, ?> outputStrategy =
           getOutputStrategyInternal(input.getWindowingStrategy());
-      if (windowFn == null) {
+      if (getWindowFn() == null) {
         // A new PCollection must be created in case input is reused in a 
different location as the
         // two PCollections will, in general, have a different windowing 
strategy.
         return PCollectionList.of(input)
@@ -484,36 +447,36 @@ public class Window {
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
-      if (windowFn != null) {
+      if (getWindowFn() != null) {
         builder
-            .add(DisplayData.item("windowFn", windowFn.getClass())
+            .add(DisplayData.item("windowFn", getWindowFn().getClass())
               .withLabel("Windowing Function"))
-            .include("windowFn", windowFn);
+            .include("windowFn", getWindowFn());
       }
 
-      if (allowedLateness != null) {
-        builder.addIfNotDefault(DisplayData.item("allowedLateness", 
allowedLateness)
+      if (getAllowedLateness() != null) {
+        builder.addIfNotDefault(DisplayData.item("allowedLateness", 
getAllowedLateness())
               .withLabel("Allowed Lateness"),
             Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
       }
 
-      if (trigger != null && !(trigger instanceof DefaultTrigger)) {
-        builder.add(DisplayData.item("trigger", trigger.toString())
+      if (getTrigger() != null && !(getTrigger() instanceof DefaultTrigger)) {
+        builder.add(DisplayData.item("trigger", getTrigger().toString())
           .withLabel("Trigger"));
       }
 
-      if (mode != null) {
-        builder.add(DisplayData.item("accumulationMode", mode.toString())
+      if (getAccumulationMode() != null) {
+        builder.add(DisplayData.item("accumulationMode", 
getAccumulationMode().toString())
           .withLabel("Accumulation Mode"));
       }
 
-      if (closingBehavior != null) {
-        builder.add(DisplayData.item("closingBehavior", 
closingBehavior.toString())
+      if (getClosingBehavior() != null) {
+        builder.add(DisplayData.item("closingBehavior", 
getClosingBehavior().toString())
           .withLabel("Window Closing Behavior"));
       }
 
-      if (outputTimeFn != null) {
-        builder.add(DisplayData.item("outputTimeFn", outputTimeFn.getClass())
+      if (getOutputTimeFn() != null) {
+        builder.add(DisplayData.item("outputTimeFn", 
getOutputTimeFn().getClass())
           .withLabel("Output Time Function"));
       }
     }

Reply via email to