[BEAM-80] Decide EARLY or ON_TIME based on input watermark

This change is based on trigger specs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/968494f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/968494f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/968494f3

Branch: refs/heads/master
Commit: 968494f3e427cc242d91ef6de25f5c7c408540dc
Parents: f87f35b
Author: Pei He <[email protected]>
Authored: Wed Mar 2 16:28:43 2016 -0800
Committer: Kenneth Knowles <[email protected]>
Committed: Wed Mar 2 19:01:09 2016 -0800

----------------------------------------------------------------------
 .../dataflow/sdk/util/PaneInfoTracker.java      | 30 +++++++++----------
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 31 ++++++++------------
 .../dataflow/sdk/util/ReduceFnRunnerTest.java   |  9 +++---
 3 files changed, 32 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
index 38499c2..a7818a3 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/PaneInfoTracker.java
@@ -54,13 +54,11 @@ public class PaneInfoTracker {
    * Return a ({@link ReadableState} for) the pane info appropriate for {@code 
context}. The pane
    * info includes the timing for the pane, who's calculation is quite subtle.
    *
-   * @param isEndOfWindow should be {@code true} only if the pane is being 
emitted
-   * because an end-of-window timer has fired and the trigger agreed we should 
fire.
    * @param isFinal should be {@code true} only if the triggering machinery 
can guarantee
    * no further firings for the
    */
-  public ReadableState<PaneInfo> getNextPaneInfo(ReduceFn<?, ?, ?, ?>.Context 
context,
-      final boolean isEndOfWindow, final boolean isFinal) {
+  public ReadableState<PaneInfo> getNextPaneInfo(
+      ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
     final Object key = context.key();
     final ReadableState<PaneInfo> previousPaneFuture =
         context.state().access(PaneInfoTracker.PANE_INFO_TAG);
@@ -76,7 +74,7 @@ public class PaneInfoTracker {
       @Override
       public PaneInfo read() {
         PaneInfo previousPane = previousPaneFuture.read();
-        return describePane(key, windowMaxTimestamp, previousPane, 
isEndOfWindow, isFinal);
+        return describePane(key, windowMaxTimestamp, previousPane, isFinal);
       }
     };
   }
@@ -85,8 +83,8 @@ public class PaneInfoTracker {
     context.state().access(PANE_INFO_TAG).write(currentPane);
   }
 
-  private <W> PaneInfo describePane(Object key, Instant windowMaxTimestamp, 
PaneInfo previousPane,
-      boolean isEndOfWindow, boolean isFinal) {
+  private <W> PaneInfo describePane(
+      Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean 
isFinal) {
     boolean isFirst = previousPane == null;
     Timing previousTiming = isFirst ? null : previousPane.getTiming();
     long index = isFirst ? 0 : previousPane.getIndex() + 1;
@@ -104,26 +102,28 @@ public class PaneInfoTracker {
     // if the output watermark is behind the end of the window.
     boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == 
Timing.EARLY;
 
+    // True is the input watermark hasn't passed the window's max timestamp.
+    boolean isEarlyForInput = inputWM == null || 
!inputWM.isAfter(windowMaxTimestamp);
+
     Timing timing;
     if (isLateForOutput || !onlyEarlyPanesSoFar) {
       // The output watermark has already passed the end of this window, or we 
have already
       // emitted a non-EARLY pane. Irrespective of how this pane was triggered 
we must
       // consider this pane LATE.
       timing = Timing.LATE;
-    } else if (isEndOfWindow) {
-      // This is the unique ON_TIME firing for the window.
-      timing = Timing.ON_TIME;
-    } else {
-      // All other cases are EARLY.
+    } else if (isEarlyForInput) {
+      // This is an EARLY firing.
       timing = Timing.EARLY;
       nonSpeculativeIndex = -1;
+    } else {
+      // This is the unique ON_TIME firing for the window.
+      timing = Timing.ON_TIME;
     }
 
     WindowTracing.debug(
         "describePane: {} pane (prev was {}) for key:{}; 
windowMaxTimestamp:{}; "
-        + "inputWatermark:{}; outputWatermark:{}; isEndOfWindow:{}; 
isLateForOutput:{}",
-        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, 
isEndOfWindow,
-        isLateForOutput);
+        + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
+        timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, 
isLateForOutput);
 
     if (previousPane != null) {
       // Timing transitions should follow EARLY* ON_TIME? LATE*

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java 
b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index fe5c474..1a009bb 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -289,7 +289,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
           contextFactory.base(mergedWindow, StateStyle.RENAMED);
       triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
-      emitIfAppropriate(directContext, renamedContext, false/* isEndOfWindow 
*/);
+      emitIfAppropriate(directContext, renamedContext);
     }
 
     // We're all done with merging and emitting elements so can compress the 
activeWindow state.
@@ -532,14 +532,6 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
           "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window 
{}", timer, window);
     }
 
-    // If this is an end-of-window timer then:
-    // 1. We need to set a GC timer
-    // 2. We need to let the PaneInfoTracker know that we are transitioning 
from early to late,
-    // and possibly emitting an on-time pane.
-    boolean isEndOfWindow =
-        TimeDomain.EVENT_TIME == timer.getDomain()
-        && timer.getTimestamp().equals(window.maxTimestamp());
-
     // If this is a garbage collection timer then we should trigger and 
garbage collect the window.
     Instant cleanupTime = 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
     boolean isGarbageCollection =
@@ -556,7 +548,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
         // We need to call onTrigger to emit the final pane if required.
         // The final pane *may* be ON_TIME if no prior ON_TIME pane has been 
emitted,
         // and the watermark has passed the end of the window.
-        onTrigger(directContext, renamedContext, isEndOfWindow, true/* 
isFinished */);
+        onTrigger(directContext, renamedContext, true/* isFinished */);
       }
 
       // Cleanup flavor B: Clear all the remaining state for this window since 
we'll never
@@ -569,9 +561,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
           key, window, timer.getTimestamp(), 
timerInternals.currentInputWatermarkTime(),
           timerInternals.currentOutputWatermarkTime());
       if (windowIsActive) {
-        emitIfAppropriate(directContext, renamedContext, isEndOfWindow);
+        emitIfAppropriate(directContext, renamedContext);
       }
 
+      // If this is an end-of-window timer then, we need to set a GC timer
+      boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+          && timer.getTimestamp().equals(window.maxTimestamp());
       if (isEndOfWindow) {
         // Since we are processing an on-time firing we should schedule the 
garbage collection
         // timer. (If getAllowedLateness is zero then the timer event will be 
considered a
@@ -649,7 +644,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
    * Possibly emit a pane if a trigger is ready to fire or timers require it, 
and cleanup state.
    */
   private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context 
directContext,
-      ReduceFn<K, InputT, OutputT, W>.Context renamedContext, boolean 
isEndOfWindow)
+      ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
       throws Exception {
     if (!triggerRunner.shouldFire(
         directContext.window(), directContext.timers(), 
directContext.state())) {
@@ -667,7 +662,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     // Run onTrigger to produce the actual pane contents.
     // As a side effect it will clear all element holds, but not necessarily 
any
     // end-of-window or garbage collection holds.
-    onTrigger(directContext, renamedContext, isEndOfWindow, isFinished);
+    onTrigger(directContext, renamedContext, isFinished);
 
     // Now that we've triggered, the pane is empty.
     nonEmptyPanes.clearPane(renamedContext.state());
@@ -692,13 +687,12 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
   /**
    * Do we need to emit a pane?
    */
-  private boolean needToEmit(
-      boolean isEmpty, boolean isEndOfWindow, boolean isFinished, 
PaneInfo.Timing timing) {
+  private boolean needToEmit(boolean isEmpty, boolean isFinished, 
PaneInfo.Timing timing) {
     if (!isEmpty) {
       // The pane has elements.
       return true;
     }
-    if (isEndOfWindow && timing == Timing.ON_TIME) {
+    if (timing == Timing.ON_TIME) {
       // This is the unique ON_TIME pane.
       return true;
     }
@@ -715,14 +709,13 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
   private void onTrigger(
       final ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
-      boolean isEndOfWindow,
       boolean isFinished)
           throws Exception {
     // Prefetch necessary states
     ReadableState<Instant> outputTimestampFuture =
         watermarkHold.extractAndRelease(renamedContext, 
isFinished).readLater();
     ReadableState<PaneInfo> paneFuture =
-        paneInfoTracker.getNextPaneInfo(directContext, isEndOfWindow, 
isFinished).readLater();
+        paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
     ReadableState<Boolean> isEmptyFuture =
         nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
 
@@ -735,7 +728,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     final Instant outputTimestamp = outputTimestampFuture.read();
 
     // Only emit a pane if it has data or empty panes are observable.
-    if (needToEmit(isEmptyFuture.read(), isEndOfWindow, isFinished, 
pane.getTiming())) {
+    if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
       // Run reduceFn.onTrigger method.
       final List<W> windows = 
Collections.singletonList(directContext.window());
       ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/968494f3/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
index c85b1ca..4fb3e37 100644
--- 
a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
+++ 
b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java
@@ -479,19 +479,20 @@ public class ReduceFnRunnerTest {
     when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 3);
     assertThat(tester.extractOutput(), contains(
-        // This is late, because the trigger wasn't waiting for AfterWatermark
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, 
Timing.EARLY, 2, -1))));
+        WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))));
 
     when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
     injectElement(tester, 4);
     assertThat(tester.extractOutput(), contains(
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, 
Timing.EARLY, 3, -1))));
+        WindowMatchers.valueWithPaneInfo(
+            PaneInfo.createPane(false, false, Timing.LATE, 3, 1))));
 
     when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true);
     triggerShouldFinish(mockTrigger);
     injectElement(tester, 5);
     assertThat(tester.extractOutput(), contains(
-        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, 
Timing.EARLY, 4, -1))));
+        WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, 
Timing.LATE, 4, 2))));
   }
 
   @Test

Reply via email to