Repository: beam
Updated Branches:
  refs/heads/master 3bcbba121 -> b82cd2446


http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index 14f818a..268718a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -20,17 +20,20 @@ package org.apache.beam.sdk.util;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.MoreObjects;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Objects;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 /**
  * A {@code WindowingStrategy} describes the windowing behavior for a specific 
collection of values.
@@ -55,22 +58,22 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
   private static final WindowingStrategy<Object, GlobalWindow> DEFAULT = 
of(new GlobalWindows());
 
   private final WindowFn<T, W> windowFn;
+  private final OutputTimeFn<? super W> outputTimeFn;
   private final Trigger trigger;
   private final AccumulationMode mode;
   private final Duration allowedLateness;
   private final ClosingBehavior closingBehavior;
-  private final TimestampCombiner timestampCombiner;
   private final boolean triggerSpecified;
   private final boolean modeSpecified;
   private final boolean allowedLatenessSpecified;
-  private final boolean timestampCombinerSpecified;
+  private final boolean outputTimeFnSpecified;
 
   private WindowingStrategy(
       WindowFn<T, W> windowFn,
       Trigger trigger, boolean triggerSpecified,
       AccumulationMode mode, boolean modeSpecified,
       Duration allowedLateness, boolean allowedLatenessSpecified,
-      TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
+      OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified,
       ClosingBehavior closingBehavior) {
     this.windowFn = windowFn;
     this.trigger = trigger;
@@ -80,8 +83,8 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
     this.allowedLateness = allowedLateness;
     this.allowedLatenessSpecified = allowedLatenessSpecified;
     this.closingBehavior = closingBehavior;
-    this.timestampCombiner = timestampCombiner;
-    this.timestampCombinerSpecified = timestampCombinerSpecified;
+    this.outputTimeFn = outputTimeFn;
+    this.outputTimeFnSpecified = outputTimeFnSpecified;
   }
 
   /**
@@ -97,7 +100,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
         DefaultTrigger.of(), false,
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
-        TimestampCombiner.END_OF_WINDOW, false,
+        new CombineWindowFnOutputTimes(OutputTimeFns.outputAtEndOfWindow(), 
windowFn), false,
         ClosingBehavior.FIRE_IF_NON_EMPTY);
   }
 
@@ -133,12 +136,12 @@ public class WindowingStrategy<T, W extends 
BoundedWindow> implements Serializab
     return closingBehavior;
   }
 
-  public TimestampCombiner getTimestampCombiner() {
-    return timestampCombiner;
+  public OutputTimeFn<? super W> getOutputTimeFn() {
+    return outputTimeFn;
   }
 
-  public boolean isTimestampCombinerSpecified() {
-    return timestampCombinerSpecified;
+  public boolean isOutputTimeFnSpecified() {
+    return outputTimeFnSpecified;
   }
 
   /**
@@ -151,7 +154,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
         trigger, true,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -165,7 +168,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
         trigger, triggerSpecified,
         mode, true,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -177,12 +180,17 @@ public class WindowingStrategy<T, W extends 
BoundedWindow> implements Serializab
     @SuppressWarnings("unchecked")
     WindowFn<T, W> typedWindowFn = (WindowFn<T, W>) wildcardWindowFn;
 
+    // The onus of type correctness falls on the callee.
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>)
+        new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn);
+
     return new WindowingStrategy<T, W>(
         typedWindowFn,
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        newOutputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -196,7 +204,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, true,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
@@ -206,19 +214,40 @@ public class WindowingStrategy<T, W extends 
BoundedWindow> implements Serializab
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
+        outputTimeFn, outputTimeFnSpecified,
         closingBehavior);
   }
 
   @Experimental(Experimental.Kind.OUTPUT_TIME)
-  public WindowingStrategy<T, W> withTimestampCombiner(TimestampCombiner 
timestampCombiner) {
+  public WindowingStrategy<T, W> withOutputTimeFn(OutputTimeFn<?> 
outputTimeFn) {
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) 
outputTimeFn;
+
+    OutputTimeFn<? super W> newOutputTimeFn =
+        new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn);
 
     return new WindowingStrategy<T, W>(
         windowFn,
         trigger, triggerSpecified,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, true,
+        newOutputTimeFn, true,
+        closingBehavior);
+  }
+
+  /**
+   * Fixes all the defaults so that equals can be used to check that two 
strategies are the same,
+   * regardless of the state of "defaulted-ness".
+   */
+  @VisibleForTesting
+  public WindowingStrategy<T, W> fixDefaults() {
+    return new WindowingStrategy<>(
+        windowFn,
+        trigger, true,
+        mode, true,
+        allowedLateness, true,
+        outputTimeFn, true,
         closingBehavior);
   }
 
@@ -229,7 +258,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> 
implements Serializab
         .add("allowedLateness", allowedLateness)
         .add("trigger", trigger)
         .add("accumulationMode", mode)
-        .add("timestampCombiner", timestampCombiner)
+        .add("outputTimeFn", outputTimeFn)
         .toString();
   }
 
@@ -239,45 +268,104 @@ public class WindowingStrategy<T, W extends 
BoundedWindow> implements Serializab
       return false;
     }
     WindowingStrategy<?, ?> other = (WindowingStrategy<?, ?>) object;
-    return isTriggerSpecified() == other.isTriggerSpecified()
+    return
+        isTriggerSpecified() == other.isTriggerSpecified()
         && isAllowedLatenessSpecified() == other.isAllowedLatenessSpecified()
         && isModeSpecified() == other.isModeSpecified()
-        && isTimestampCombinerSpecified() == 
other.isTimestampCombinerSpecified()
         && getMode().equals(other.getMode())
         && getAllowedLateness().equals(other.getAllowedLateness())
         && getClosingBehavior().equals(other.getClosingBehavior())
         && getTrigger().equals(other.getTrigger())
-        && getTimestampCombiner().equals(other.getTimestampCombiner())
         && getWindowFn().equals(other.getWindowFn());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(
-        triggerSpecified,
-        allowedLatenessSpecified,
-        modeSpecified,
-        timestampCombinerSpecified,
-        mode,
-        allowedLateness,
-        closingBehavior,
-        trigger,
-        timestampCombiner,
-        windowFn);
+    return Objects.hash(triggerSpecified, allowedLatenessSpecified, 
modeSpecified,
+        windowFn, trigger, mode, allowedLateness, closingBehavior);
   }
 
   /**
-   * Fixes all the defaults so that equals can be used to check that two 
strategies are the same,
-   * regardless of the state of "defaulted-ness".
+   * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to 
assign initial timestamps
+   * but then combines and merges according to a given {@link OutputTimeFn}.
+   *
+   * <ul>
+   *   <li>The {@link WindowFn#getOutputTime} allows adjustments such as that 
whereby
+   *       {@link 
org.apache.beam.sdk.transforms.windowing.SlidingWindows#getOutputTime}
+   *       moves elements later in time to avoid holding up progress 
downstream.</li>
+   *   <li>Then, when multiple elements are buffered for output, the output 
timestamp of the
+   *       result is calculated using {@link OutputTimeFn#combine}.</li>
+   *   <li>In the case of a merging {@link WindowFn}, the output timestamp 
when windows merge
+   *       is calculated using {@link OutputTimeFn#merge}.</li>
+   * </ul>
    */
-  @VisibleForTesting
-  public WindowingStrategy<T, W> fixDefaults() {
-    return new WindowingStrategy<>(
-        windowFn,
-        trigger, true,
-        mode, true,
-        allowedLateness, true,
-        timestampCombiner, true,
-        closingBehavior);
+  public static class CombineWindowFnOutputTimes<W extends BoundedWindow>
+      extends OutputTimeFn<W> {
+
+    private final OutputTimeFn<? super W> outputTimeFn;
+    private final WindowFn<?, W> windowFn;
+
+    public CombineWindowFnOutputTimes(
+        OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) {
+      this.outputTimeFn = outputTimeFn;
+      this.windowFn = windowFn;
+    }
+
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+
+    @Override
+    public Instant assignOutputTime(Instant inputTimestamp, W window) {
+      return outputTimeFn.merge(
+          window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, 
window)));
+    }
+
+    @Override
+    public Instant combine(Instant timestamp, Instant otherTimestamp) {
+      return outputTimeFn.combine(timestamp, otherTimestamp);
+    }
+
+    @Override
+    public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) {
+      return outputTimeFn.merge(newWindow, timestamps);
+    }
+
+    @Override
+    public final boolean dependsOnlyOnWindow() {
+      return outputTimeFn.dependsOnlyOnWindow();
+    }
+
+    @Override
+    public boolean dependsOnlyOnEarliestInputTimestamp() {
+      return outputTimeFn.dependsOnlyOnEarliestInputTimestamp();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == this) {
+        return true;
+      }
+
+      if (!(obj instanceof CombineWindowFnOutputTimes)) {
+        return false;
+      }
+
+      CombineWindowFnOutputTimes<?> that = (CombineWindowFnOutputTimes<?>) obj;
+      return outputTimeFn.equals(that.outputTimeFn) && 
windowFn.equals(that.windowFn);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(outputTimeFn, windowFn);
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("outputTimeFn", outputTimeFn)
+          .add("windowFn", windowFn)
+          .toString();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
index f9ab115..64841fb 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateBinder.java
@@ -21,7 +21,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 
 /**
  * Visitor for binding a {@link StateSpec} and to the associated {@link State}.
@@ -63,11 +63,11 @@ public interface StateBinder<K> {
   /**
    * Bind to a watermark {@link StateSpec}.
    *
-   * <p>This accepts the {@link TimestampCombiner} that dictates how watermark 
hold timestamps added
-   * to the returned {@link WatermarkHoldState} are to be combined.
+   * <p>This accepts the {@link OutputTimeFn} that dictates how watermark hold 
timestamps added to
+   * the returned {@link WatermarkHoldState} are to be combined.
    */
-  <W extends BoundedWindow> WatermarkHoldState bindWatermark(
+  <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
       String id,
-      StateSpec<? super K, WatermarkHoldState> spec,
-      TimestampCombiner timestampCombiner);
+      StateSpec<? super K, WatermarkHoldState<W>> spec,
+      OutputTimeFn<? super W> outputTimeFn);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
index 8fa5bb0..dc647da 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/StateSpecs.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
 import 
org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 
 /**
  * Static utility methods for creating {@link StateSpec} instances.
@@ -208,9 +208,9 @@ public class StateSpecs {
 
   /** Create a state spec for holding the watermark. */
   public static <W extends BoundedWindow>
-      StateSpec<Object, WatermarkHoldState> watermarkStateInternal(
-          TimestampCombiner timestampCombiner) {
-    return new WatermarkStateSpecInternal<W>(timestampCombiner);
+      StateSpec<Object, WatermarkHoldState<W>> watermarkStateInternal(
+          OutputTimeFn<? super W> outputTimeFn) {
+    return new WatermarkStateSpecInternal<W>(outputTimeFn);
   }
 
   public static <K, InputT, AccumT, OutputT>
@@ -656,26 +656,26 @@ public class StateSpecs {
   /**
    * A specification for a state cell tracking a combined watermark hold.
    *
-   * <p>Includes the {@link TimestampCombiner} according to which the output 
times
+   * <p>Includes the {@link OutputTimeFn} according to which the output times
    * are combined.
    */
   private static class WatermarkStateSpecInternal<W extends BoundedWindow>
-      implements StateSpec<Object, WatermarkHoldState> {
+      implements StateSpec<Object, WatermarkHoldState<W>> {
 
     /**
      * When multiple output times are added to hold the watermark, this 
determines how they are
      * combined, and also the behavior when merging windows. Does not 
contribute to equality/hash
      * since we have at most one watermark hold spec per computation.
      */
-    private final TimestampCombiner timestampCombiner;
+    private final OutputTimeFn<? super W> outputTimeFn;
 
-    private WatermarkStateSpecInternal(TimestampCombiner timestampCombiner) {
-      this.timestampCombiner = timestampCombiner;
+    private WatermarkStateSpecInternal(OutputTimeFn<? super W> outputTimeFn) {
+      this.outputTimeFn = outputTimeFn;
     }
 
     @Override
-    public WatermarkHoldState bind(String id, StateBinder<?> visitor) {
-      return visitor.bindWatermark(id, this, timestampCombiner);
+    public WatermarkHoldState<W> bind(String id, StateBinder<?> visitor) {
+      return visitor.bindWatermark(id, this, outputTimeFn);
     }
 
     @Override
@@ -701,4 +701,5 @@ public class StateSpecs {
       return Objects.hash(getClass());
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
index ae9b700..20fa05f 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/WatermarkHoldState.java
@@ -19,24 +19,25 @@ package org.apache.beam.sdk.util.state;
 
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.joda.time.Instant;
 
 /**
- * A {@link State} accepting and aggregating output timestamps, which 
determines the time to which
- * the output watermark must be held.
+ * A {@link State} accepting and aggregating output timestamps, which 
determines
+ * the time to which the output watermark must be held.
  *
  * <p><b><i>For internal use only. This API may change at any time.</i></b>
  */
 @Experimental(Kind.STATE)
-public interface WatermarkHoldState extends GroupingState<Instant, Instant> {
+public interface WatermarkHoldState<W extends BoundedWindow>
+    extends GroupingState<Instant, Instant> {
   /**
-   * Return the {@link TimestampCombiner} which will be used to determine a 
watermark hold time
-   * given an element timestamp, and to combine watermarks from windows which 
are about to be
-   * merged.
+   * Return the {@link OutputTimeFn} which will be used to determine a 
watermark hold time given
+   * an element timestamp, and to combine watermarks from windows which are 
about to be merged.
    */
-  TimestampCombiner getTimestampCombiner();
+  OutputTimeFn<? super W> getOutputTimeFn();
 
   @Override
-  WatermarkHoldState readLater();
+  WatermarkHoldState<W> readLater();
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
index 26dd9f9..153bd84 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/SdkCoreApiSurfaceTest.java
@@ -39,6 +39,7 @@ public class SdkCoreApiSurfaceTest {
         ImmutableSet.of(
             "org.apache.beam",
             "com.google.api.client",
+            "com.google.protobuf",
             "com.fasterxml.jackson.annotation",
             "com.fasterxml.jackson.core",
             "com.fasterxml.jackson.databind",

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 0556199..939261f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -51,8 +51,8 @@ import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.Reshuffle;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -318,14 +318,14 @@ public class GroupByKeyTest {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerEarliest() {
+  public void testOutputTimeFnEarliest() {
 
     p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST))
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(0))));
 
@@ -339,13 +339,13 @@ public class GroupByKeyTest {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerLatest() {
+  public void testOutputTimeFnLatest() {
     p.apply(
         Create.timestamped(
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withTimestampCombiner(TimestampCombiner.LATEST))
+            .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new AssertTimestamp(new Instant(10))));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
index 9a17bc7..4e61f4e 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java
@@ -41,7 +41,7 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -241,7 +241,7 @@ public class CoGroupByKeyTest implements Serializable {
             Arrays.asList(0L, 2L, 4L, 6L, 8L))
         .apply("WindowClicks", Window.<KV<Integer, String>>into(
             FixedWindows.of(new Duration(4)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST));
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
 
     PCollection<KV<Integer, String>> purchasesTable =
         createInput("CreatePurchases",
@@ -250,7 +250,7 @@ public class CoGroupByKeyTest implements Serializable {
             Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L))
         .apply("WindowPurchases", Window.<KV<Integer, String>>into(
             FixedWindows.of(new Duration(4)))
-            .withTimestampCombiner(TimestampCombiner.EARLIEST));
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()));
 
     PCollection<KV<Integer, CoGbkResult>> coGbkResults =
         KeyedPCollectionTuple.of(clicksTag, clicksTable)

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
new file mode 100644
index 0000000..78d7a2f
--- /dev/null
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFnsTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link OutputTimeFns}. */
+@RunWith(Parameterized.class)
+public class OutputTimeFnsTest {
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<OutputTimeFn<BoundedWindow>> data() {
+    return ImmutableList.of(
+        OutputTimeFns.outputAtEarliestInputTimestamp(),
+        OutputTimeFns.outputAtLatestInputTimestamp(),
+        OutputTimeFns.outputAtEndOfWindow());
+  }
+
+  @Parameter(0)
+  public OutputTimeFn<?> outputTimeFn;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    OutputTimeFn<?> result = 
OutputTimeFns.fromProto(OutputTimeFns.toProto(outputTimeFn));
+
+    assertThat(result, equalTo((OutputTimeFn) outputTimeFn));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
index 9d94928..b131688 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
@@ -118,7 +118,7 @@ public class SessionsTest {
   }
 
   /**
-   * Test to confirm that {@link Sessions} with the default {@link 
TimestampCombiner} holds up the
+   * Test to confirm that {@link Sessions} with the default {@link 
OutputTimeFn} holds up the
    * watermark potentially indefinitely.
    */
   @Test
@@ -126,7 +126,7 @@ public class SessionsTest {
     try {
       WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
           Sessions.withGapDuration(Duration.millis(10)),
-          TimestampCombiner.EARLIEST,
+          OutputTimeFns.outputAtEarliestInputTimestamp(),
           ImmutableList.of(
               (List<Long>) ImmutableList.of(1L, 3L),
               (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));
@@ -148,7 +148,7 @@ public class SessionsTest {
   public void testValidOutputAtEndTimes() throws Exception {
     WindowFnTestUtils.<Object, IntervalWindow>validateGetOutputTimestamps(
         Sessions.withGapDuration(Duration.millis(10)),
-        TimestampCombiner.END_OF_WINDOW,
+        OutputTimeFns.outputAtEndOfWindow(),
           ImmutableList.of(
               (List<Long>) ImmutableList.of(1L, 3L),
               (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L)));

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 534e230..e1ed66a 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -366,7 +366,7 @@ public class WindowTest implements Serializable {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerDefault() {
+  public void testOutputTimeFnDefault() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline
@@ -400,7 +400,7 @@ public class WindowTest implements Serializable {
    */
   @Test
   @Category(ValidatesRunner.class)
-  public void testTimestampCombinerEndOfWindow() {
+  public void testOutputTimeFnEndOfWindow() {
     pipeline.enableAbandonedNodeEnforcement(true);
 
     pipeline.apply(
@@ -408,7 +408,7 @@ public class WindowTest implements Serializable {
             TimestampedValue.of(KV.of(0, "hello"), new Instant(0)),
             TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10))))
         .apply(Window.<KV<Integer, 
String>>into(FixedWindows.of(Duration.standardMinutes(10)))
-            .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW))
+            .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
         .apply(GroupByKey.<Integer, String>create())
         .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() {
           @ProcessElement
@@ -426,14 +426,14 @@ public class WindowTest implements Serializable {
     AfterWatermark.FromEndOfWindow triggerBuilder = 
AfterWatermark.pastEndOfWindow();
     Duration allowedLateness = Duration.standardMinutes(10);
     Window.ClosingBehavior closingBehavior = 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
-    TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
+    OutputTimeFn<BoundedWindow> outputTimeFn = 
OutputTimeFns.outputAtEndOfWindow();
 
     Window<?> window = Window
         .into(windowFn)
         .triggering(triggerBuilder)
         .accumulatingFiredPanes()
         .withAllowedLateness(allowedLateness, closingBehavior)
-        .withTimestampCombiner(timestampCombiner);
+        .withOutputTimeFn(outputTimeFn);
 
     DisplayData displayData = DisplayData.from(window);
 
@@ -446,7 +446,7 @@ public class WindowTest implements Serializable {
     assertThat(displayData,
         hasDisplayItem("allowedLateness", allowedLateness));
     assertThat(displayData, hasDisplayItem("closingBehavior", 
closingBehavior.toString()));
-    assertThat(displayData, hasDisplayItem("timestampCombiner", 
timestampCombiner.toString()));
+    assertThat(displayData, hasDisplayItem("outputTimeFn", 
outputTimeFn.getClass()));
   }
 
   @Test
@@ -456,14 +456,14 @@ public class WindowTest implements Serializable {
     AfterWatermark.FromEndOfWindow triggerBuilder = 
AfterWatermark.pastEndOfWindow();
     Duration allowedLateness = Duration.standardMinutes(10);
     Window.ClosingBehavior closingBehavior = 
Window.ClosingBehavior.FIRE_IF_NON_EMPTY;
-    TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW;
+    OutputTimeFn<BoundedWindow> outputTimeFn = 
OutputTimeFns.outputAtEndOfWindow();
 
     Window<?> window = Window
         .into(windowFn)
         .triggering(triggerBuilder)
         .accumulatingFiredPanes()
         .withAllowedLateness(allowedLateness, closingBehavior)
-        .withTimestampCombiner(timestampCombiner);
+        .withOutputTimeFn(outputTimeFn);
 
     DisplayData primitiveDisplayData =
         Iterables.getOnlyElement(
@@ -478,8 +478,7 @@ public class WindowTest implements Serializable {
     assertThat(primitiveDisplayData,
         hasDisplayItem("allowedLateness", allowedLateness));
     assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", 
closingBehavior.toString()));
-    assertThat(
-        primitiveDisplayData, hasDisplayItem("timestampCombiner", 
timestampCombiner.toString()));
+    assertThat(primitiveDisplayData, hasDisplayItem("outputTimeFn", 
outputTimeFn.getClass()));
   }
 
   @Test
@@ -498,7 +497,7 @@ public class WindowTest implements Serializable {
     assertThat(displayData, not(hasDisplayItem("accumulationMode")));
     assertThat(displayData, not(hasDisplayItem("allowedLateness")));
     assertThat(displayData, not(hasDisplayItem("closingBehavior")));
-    assertThat(displayData, not(hasDisplayItem("timestampCombiner")));
+    assertThat(displayData, not(hasDisplayItem("outputTimeFn")));
   }
 
   @Test
@@ -507,7 +506,7 @@ public class WindowTest implements Serializable {
     assertThat(DisplayData.from(onlyHasAccumulationMode), 
not(hasDisplayItem(hasKey(isOneOf(
         "windowFn",
         "trigger",
-        "timestampCombiner",
+        "outputTimeFn",
         "allowedLateness",
         "closingBehavior")))));
 

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 30b0311..a3f5352 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -76,7 +76,7 @@ public class WindowingTest implements Serializable {
     public PCollection<String> expand(PCollection<String> in) {
       return in.apply("Window",
               Window.<String>into(windowFn)
-                  .withTimestampCombiner(TimestampCombiner.EARLIEST))
+                  
.withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()))
           .apply(Count.<String>perElement())
           .apply("FormatCounts", ParDo.of(new FormatCountsDoFn()))
           .setCoder(StringUtf8Coder.of());

http://git-wip-us.apache.org/repos/asf/beam/blob/83d41fcc/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
index 215b0f4..50edd83 100644
--- 
a/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
+++ 
b/sdks/java/extensions/gcp-core/src/test/java/org/apache/beam/GcpCoreApiSurfaceTest.java
@@ -42,6 +42,7 @@ public class GcpCoreApiSurfaceTest {
             "com.google.api.services.cloudresourcemanager",
             "com.google.api.services.storage",
             "com.google.auth",
+            "com.google.protobuf",
             "com.fasterxml.jackson.annotation",
             "com.fasterxml.jackson.core",
             "com.fasterxml.jackson.databind",

Reply via email to