Use a consistent calculation for GC Time

Truncate all garbage collection timestamps to be at the end of the
global window at the latest.

Add a reshuffle test, which was failing when late data arrived.

Update ReifyTimestamps to permit infinite skew. Elements that have
timestamps extracted from them may be late, but that is not the concern
of ReifyTimestamps.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dbb850c3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dbb850c3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dbb850c3

Branch: refs/heads/master
Commit: dbb850c3c3dedc7f62d90cdf7f15fe7cff79ce1b
Parents: e92ac6f
Author: Thomas Groh <[email protected]>
Authored: Thu May 11 09:26:30 2017 -0700
Committer: Thomas Groh <[email protected]>
Committed: Thu May 11 14:56:17 2017 -0700

----------------------------------------------------------------------
 .../core/LateDataDroppingDoFnRunner.java        |  2 +-
 .../apache/beam/runners/core/LateDataUtils.java | 33 ++++++-
 .../beam/runners/core/ReduceFnRunner.java       | 43 +++-------
 .../beam/runners/core/SimpleDoFnRunner.java     |  2 +-
 .../beam/runners/core/StatefulDoFnRunner.java   |  6 +-
 .../apache/beam/runners/core/WatermarkHold.java |  9 +-
 .../beam/runners/core/LateDataUtilsTest.java    | 90 ++++++++++++++++++++
 .../beam/sdk/transforms/ReifyTimestamps.java    |  6 ++
 .../apache/beam/sdk/transforms/Reshuffle.java   | 29 ++++---
 .../sdk/transforms/ReifyTimestampsTest.java     | 36 ++++++++
 .../beam/sdk/transforms/ReshuffleTest.java      | 27 ++++++
 11 files changed, 234 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 66385c1..570f524 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -159,7 +159,7 @@ public class LateDataDroppingDoFnRunner<K, InputT, OutputT, 
W extends BoundedWin
     /** Is {@code window} expired w.r.t. the garbage collection watermark? */
     private boolean canDropDueToExpiredWindow(BoundedWindow window) {
       Instant inputWM = timerInternals.currentInputWatermarkTime();
-      return 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM);
+      return LateDataUtils.garbageCollectionTime(window, 
windowingStrategy).isBefore(inputWM);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
index 732e60c..8a2b7c6 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -24,15 +24,46 @@ import com.google.common.collect.Iterables;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.metrics.CounterCell;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
-
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 
 /**
  * Utils to handle late data.
  */
 public class LateDataUtils {
+  private LateDataUtils() {}
+
+  /**
+   * Return when {@code window} should be garbage collected. If the window's 
expiration time is on
+   * or after the end of the global window, it will be truncated to the end of 
the global window.
+   */
+  public static Instant garbageCollectionTime(
+      BoundedWindow window, WindowingStrategy windowingStrategy) {
+    return garbageCollectionTime(window, 
windowingStrategy.getAllowedLateness());
+  }
+
+  /**
+   * Return when {@code window} should be garbage collected. If the window's 
expiration time is on
+   * or after the end of the global window, it will be truncated to the end of 
the global window.
+   */
+  public static Instant garbageCollectionTime(BoundedWindow window, Duration 
allowedLateness) {
+
+    // If the end of the window + allowed lateness is beyond the "end of time" 
aka the end of the
+    // global window, then we truncate it. The conditional is phrased like it 
is because the
+    // addition of EOW + allowed lateness might even overflow the maximum 
allowed Instant
+    if (GlobalWindow.INSTANCE
+        .maxTimestamp()
+        .minus(allowedLateness)
+        .isBefore(window.maxTimestamp())) {
+      return GlobalWindow.INSTANCE.maxTimestamp();
+    } else {
+      return window.maxTimestamp().plus(allowedLateness);
+    }
+  }
 
   /**
    * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains 
non-late input elements.

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index d2ed835..62d519f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -48,7 +48,6 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
@@ -663,7 +662,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       W window = directContext.window();
       this.isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
           && timer.getTimestamp().equals(window.maxTimestamp());
-      Instant cleanupTime = garbageCollectionTime(window);
+      Instant cleanupTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
       this.isGarbageCollection = !timer.getTimestamp().isBefore(cleanupTime);
     }
 
@@ -769,9 +768,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
           // cleanup event and handled by the above).
           // Note we must do this even if the trigger is finished so that we 
are sure to cleanup
           // any final trigger finished bits.
-          
checkState(windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+          checkState(
+              
windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
               "Unexpected zero getAllowedLateness");
-          Instant cleanupTime = garbageCollectionTime(directContext.window());
+          Instant cleanupTime =
+              LateDataUtils.garbageCollectionTime(directContext.window(), 
windowingStrategy);
           WindowTracing.debug(
               "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; 
window:{} at {} with "
                   + "inputWatermark:{}; outputWatermark:{}",
@@ -957,6 +958,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     // Extract the window hold, and as a side effect clear it.
     final WatermarkHold.OldAndNewHolds pair =
         watermarkHold.extractAndRelease(renamedContext, isFinished).read();
+    // TODO: This isn't accurate if the elements are late. See BEAM-2262
     final Instant outputTimestamp = pair.oldHold;
     @Nullable Instant newHold = pair.newHold;
 
@@ -972,11 +974,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       if (newHold.isAfter(directContext.window().maxTimestamp())) {
         // The hold must be for garbage collection, which can't have happened 
yet.
         checkState(
-          newHold.isEqual(garbageCollectionTime(directContext.window())),
-          "new hold %s should be at garbage collection for window %s plus %s",
-          newHold,
-          directContext.window(),
-          windowingStrategy.getAllowedLateness());
+            newHold.isEqual(
+                LateDataUtils.garbageCollectionTime(directContext.window(), 
windowingStrategy)),
+            "new hold %s should be at garbage collection for window %s plus 
%s",
+            newHold,
+            directContext.window(),
+            windowingStrategy.getAllowedLateness());
       } else {
         // The hold must be for the end-of-window, which can't have happened 
yet.
         checkState(
@@ -1042,7 +1045,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     String which;
     Instant timer;
     if (endOfWindow.isBefore(inputWM)) {
-      timer = garbageCollectionTime(directContext.window());
+      timer = LateDataUtils.garbageCollectionTime(directContext.window(), 
windowingStrategy);
       which = "garbage collection";
     } else {
       timer = endOfWindow;
@@ -1072,28 +1075,10 @@ public class ReduceFnRunner<K, InputT, OutputT, W 
extends BoundedWindow> {
         timerInternals.currentOutputWatermarkTime());
     Instant eow = directContext.window().maxTimestamp();
     directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
-    Instant gc = garbageCollectionTime(directContext.window());
+    Instant gc = LateDataUtils.garbageCollectionTime(directContext.window(), 
windowingStrategy);
     if (gc.isAfter(eow)) {
       directContext.timers().deleteTimer(gc, TimeDomain.EVENT_TIME);
     }
   }
 
-  /**
-   * Return when {@code window} should be garbage collected. If the window's 
expiration time is on
-   * or after the end of the global window, it will be truncated to the end of 
the global window.
-   */
-  private Instant garbageCollectionTime(W window) {
-
-    // If the end of the window + allowed lateness is beyond the "end of time" 
aka the end of the
-    // global window, then we truncate it. The conditional is phrased like it 
is because the
-    // addition of EOW + allowed lateness might even overflow the maximum 
allowed Instant
-    if (GlobalWindow.INSTANCE
-        .maxTimestamp()
-        .minus(windowingStrategy.getAllowedLateness())
-        .isBefore(window.maxTimestamp())) {
-      return GlobalWindow.INSTANCE.maxTimestamp();
-    } else {
-      return 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 85423c0..7ca305e 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -949,7 +949,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
      */
     private Instant minTargetAndGcTime(Instant target) {
       if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
-        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+        Instant windowExpiry = LateDataUtils.garbageCollectionTime(window, 
allowedLateness);
         if (target.isAfter(windowExpiry)) {
           return windowExpiry;
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 28a9dee..c68a943 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -104,7 +104,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
   }
 
   private boolean isLate(BoundedWindow window) {
-    Instant gcTime = 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+    Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
     Instant inputWM = cleanupTimer.currentInputWatermarkTime();
     return gcTime.isBefore(inputWM);
   }
@@ -208,7 +208,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
 
     @Override
     public void setForWindow(BoundedWindow window) {
-      Instant gcTime = 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
       // make sure this fires after any window.maxTimestamp() timers
       gcTime = gcTime.plus(GC_DELAY_MS);
       timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
@@ -222,7 +222,7 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
         Instant timestamp,
         TimeDomain timeDomain) {
       boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-      Instant gcTime = 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      Instant gcTime = LateDataUtils.garbageCollectionTime(window, 
windowingStrategy);
       gcTime = gcTime.plus(GC_DELAY_MS);
       return isEventTimer && GC_TIMER_ID.equals(timerId) && 
gcTime.equals(timestamp);
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 64f5d9b..13e4c43 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -365,8 +365,7 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
       ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) {
     Instant outputWM = timerInternals.currentOutputWatermarkTime();
     Instant inputWM = timerInternals.currentInputWatermarkTime();
-    Instant eow = context.window().maxTimestamp();
-    Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness());
+    Instant gcHold = LateDataUtils.garbageCollectionTime(context.window(), 
windowingStrategy);
 
     if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
       WindowTracing.trace(
@@ -387,6 +386,12 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
       return null;
     }
 
+    if (!gcHold.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      // If the garbage collection hold is past the timestamp we can 
represent, instead truncate
+      // to the maximum timestamp that is not positive infinity. This ensures 
all windows will
+      // eventually be garbage collected.
+      gcHold = BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L));
+    }
     checkState(!gcHold.isBefore(inputWM),
         "Garbage collection hold %s cannot be before input watermark %s",
         gcHold, inputWM);

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
new file mode 100644
index 0000000..f0f315d
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataUtilsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.joda.time.ReadableInstant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link LateDataUtils}.
+ */
+@RunWith(JUnit4.class)
+public class LateDataUtilsTest {
+  @Test
+  public void beforeEndOfGlobalWindowSame() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
+    Duration allowedLateness = Duration.standardMinutes(2);
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault()
+            .withWindowFn(windowFn)
+            .withAllowedLateness(allowedLateness);
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(10));
+    assertThat(
+        LateDataUtils.garbageCollectionTime(window, strategy),
+        equalTo(window.maxTimestamp().plus(allowedLateness)));
+  }
+
+  @Test
+  public void garbageCollectionTimeAfterEndOfGlobalWindow() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault()
+            .withWindowFn(windowFn);
+
+    IntervalWindow window = windowFn.assignWindow(new 
Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    assertThat(
+        window.maxTimestamp(),
+        
Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+    assertThat(
+        LateDataUtils.garbageCollectionTime(window, strategy),
+        equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
+  }
+
+  @Test
+  public void garbageCollectionTimeAfterEndOfGlobalWindowWithLateness() {
+    FixedWindows windowFn = FixedWindows.of(Duration.standardMinutes(5));
+    Duration allowedLateness = Duration.millis(Long.MAX_VALUE);
+    WindowingStrategy<?, ?> strategy =
+        WindowingStrategy.globalDefault()
+            .withWindowFn(windowFn)
+            .withAllowedLateness(allowedLateness);
+
+    IntervalWindow window = windowFn.assignWindow(new Instant(-100));
+    assertThat(
+        window.maxTimestamp().plus(allowedLateness),
+        
Matchers.<ReadableInstant>greaterThan(GlobalWindow.INSTANCE.maxTimestamp()));
+    assertThat(
+        LateDataUtils.garbageCollectionTime(window, strategy),
+        equalTo(GlobalWindow.INSTANCE.maxTimestamp()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
index 0b1ab25..990f235 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ReifyTimestamps.java
@@ -21,6 +21,7 @@ package org.apache.beam.sdk.transforms;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
 
 /**
  * {@link PTransform PTransforms} for reifying the timestamp of values and 
reemitting the original
@@ -63,6 +64,11 @@ class ReifyTimestamps {
 
   private static class ExtractTimestampedValueDoFn<K, V>
       extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
+    @Override
+    public Duration getAllowedTimestampSkew() {
+      return Duration.millis(Long.MAX_VALUE);
+    }
+
     @ProcessElement
     public void processElement(ProcessContext context) {
       KV<K, TimestampedValue<V>> kv = context.element();

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
index 5394826..3b7122c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java
@@ -71,22 +71,27 @@ public class Reshuffle<K, V> extends 
PTransform<PCollection<KV<K, V>>, PCollecti
             .withTimestampCombiner(TimestampCombiner.EARLIEST)
             
.withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
-    return input.apply(rewindow)
+    return input
+        .apply(rewindow)
         .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
         .apply(GroupByKey.<K, TimestampedValue<V>>create())
         // Set the windowing strategy directly, so that it doesn't get counted 
as the user having
         // set allowed lateness.
         .setWindowingStrategyInternal(originalStrategy)
-        .apply("ExpandIterable", ParDo.of(
-            new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, 
TimestampedValue<V>>>() {
-              @ProcessElement
-              public void processElement(ProcessContext c) {
-                K key = c.element().getKey();
-                for (TimestampedValue<V> value : c.element().getValue()) {
-                  c.output(KV.of(key, value));
-                }
-              }
-            }))
-        .apply("RestoreOriginalTimestamps", ReifyTimestamps.<K, 
V>extractFromValues());
+        .apply(
+            "ExpandIterable",
+            ParDo.of(
+                new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K, 
TimestampedValue<V>>>() {
+                  @ProcessElement
+                  public void processElement(ProcessContext c) {
+                    K key = c.element().getKey();
+                    for (TimestampedValue<V> value : c.element().getValue()) {
+                      c.output(KV.of(key, value));
+                    }
+                  }
+                }))
+        .apply(
+            "RestoreOriginalTimestamps",
+            ReifyTimestamps.<K, V>extractFromValues());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
index 181433e..e872842 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReifyTimestampsTest.java
@@ -101,4 +101,40 @@ public class ReifyTimestampsTest implements Serializable {
 
     pipeline.run();
   }
+
+  @Test
+  @Category(ValidatesRunner.class)
+  public void extractFromValuesWhenValueTimestampedLaterSucceeds() {
+    PCollection<KV<String, TimestampedValue<Integer>>> preified =
+        pipeline.apply(
+            Create.timestamped(
+                TimestampedValue.of(
+                    KV.of("foo", TimestampedValue.of(0, new Instant((0)))), 
new Instant(100)),
+                TimestampedValue.of(
+                    KV.of("foo", TimestampedValue.of(1, new Instant(1))), new 
Instant(101L)),
+                TimestampedValue.of(
+                    KV.of("bar", TimestampedValue.of(2, new Instant(2))), new 
Instant(102L)),
+                TimestampedValue.of(
+                    KV.of("baz", TimestampedValue.of(3, new Instant(3))), new 
Instant(103L))));
+
+    PCollection<KV<String, Integer>> timestamped =
+        preified.apply(ReifyTimestamps.<String, Integer>extractFromValues());
+
+    PAssert.that(timestamped)
+        .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), 
KV.of("baz", 3));
+
+    timestamped.apply(
+        "AssertElementTimestamps",
+        ParDo.of(
+            new DoFn<KV<String, Integer>, Void>() {
+              @ProcessElement
+              public void verifyTimestampsEqualValue(ProcessContext context) {
+                assertThat(
+                    new Instant(context.element().getValue().longValue()),
+                    equalTo(context.timestamp()));
+              }
+            }));
+
+    pipeline.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/dbb850c3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
index 1038fd6..3cd7cf9 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ReshuffleTest.java
@@ -27,8 +27,11 @@ import java.util.List;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
@@ -263,4 +266,28 @@ public class ReshuffleTest implements Serializable {
 
     pipeline.run();
   }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class})
+  public void testReshuffleWithTimestampsStreaming() {
+    TestStream<Long> stream =
+        TestStream.create(VarLongCoder.of())
+            .advanceWatermarkTo(new 
Instant(0L).plus(Duration.standardDays(48L)))
+            .addElements(
+                TimestampedValue.of(0L, new Instant(0L)),
+                TimestampedValue.of(1L, new 
Instant(0L).plus(Duration.standardDays(48L))),
+                TimestampedValue.of(
+                    2L, 
BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(48L))))
+            .advanceWatermarkToInfinity();
+    PCollection<KV<String, Long>> input =
+        pipeline
+            .apply(stream).apply(WithKeys.<String, Long>of(""))
+            .apply(
+                Window.<KV<String, 
Long>>into(FixedWindows.of(Duration.standardMinutes(10L))));
+
+    PCollection<KV<String, Long>> reshuffled = input.apply(Reshuffle.<String, 
Long>of());
+    
PAssert.that(reshuffled.apply(Values.<Long>create())).containsInAnyOrder(0L, 
1L, 2L);
+
+    pipeline.run();
+  }
 }

Reply via email to