Repository: incubator-beam
Updated Branches:
  refs/heads/master e9c0d8d05 -> c772f4295


Forward port changes to GC holds

Forward port changes to BEAM/pull/391 from DataflowJavaSDK/pull/289.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f78912b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f78912b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f78912b

Branch: refs/heads/master
Commit: 0f78912b23e827327527adf9764134daccb60ad4
Parents: e9c0d8d
Author: Mark Shields <[email protected]>
Authored: Tue May 31 14:44:20 2016 -0700
Committer: bchambers <[email protected]>
Committed: Wed Jun 1 10:50:59 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/ReduceFnRunner.java    |  5 +-
 .../org/apache/beam/sdk/util/WatermarkHold.java | 51 ++++++++++----------
 2 files changed, 29 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f78912b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
index 889ac6f..34208da 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java
@@ -606,7 +606,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
         // and the watermark has passed the end of the window.
         @Nullable Instant newHold =
             onTrigger(directContext, renamedContext, true/* isFinished */, 
isEndOfWindow);
-        Preconditions.checkState(newHold == null);
+        Preconditions.checkState(newHold == null,
+            "Hold placed at %s despite isFinished being true.", newHold);
       }
 
       // Cleanup flavor B: Clear all the remaining state for this window since 
we'll never
@@ -691,7 +692,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       //
       // But(!) for backwards compatibility we must allow a pipeline to be 
updated from
       // an sdk version <= 1.3. In that case it is possible we have an 
end-of-window or
-      // garbage collection holds keyed by the current window (reached via 
directContext) rather
+      // garbage collection hold keyed by the current window (reached via 
directContext) rather
       // than the state address window (reached via renamedContext).
       // However this can only happen if:
       // - We have merging windows.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f78912b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
index ad842b5..14ec082 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java
@@ -206,13 +206,13 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
   private Instant shift(Instant timestamp, W window) {
     Instant shifted = 
windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window);
     Preconditions.checkState(!shifted.isBefore(timestamp),
-                             "OutputTimeFn moved element from %s to earlier 
time %s for window %s",
-                             timestamp, shifted, window);
+        "OutputTimeFn moved element from %s to earlier time %s for window %s",
+        timestamp, shifted, window);
     Preconditions.checkState(timestamp.isAfter(window.maxTimestamp())
-                             || !shifted.isAfter(window.maxTimestamp()),
-                             "OutputTimeFn moved element from %s to %s which 
is beyond end of "
-                             + "window %s",
-                             timestamp, shifted, window);
+            || !shifted.isAfter(window.maxTimestamp()),
+        "OutputTimeFn moved element from %s to %s which is beyond end of "
+            + "window %s",
+        timestamp, shifted, window);
 
     return shifted;
   }
@@ -311,16 +311,16 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
     if (eowHold.isBefore(inputWM)) {
       WindowTracing.trace(
           "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too 
late for "
-          + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; 
outputWatermark:{}",
+              + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; 
outputWatermark:{}",
           eowHold, context.key(), context.window(), inputWM, outputWM);
       return null;
     }
 
     Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM),
-                             "End-of-window hold %s cannot be before output 
watermark %s",
-                             eowHold, outputWM);
+        "End-of-window hold %s cannot be before output watermark %s",
+        eowHold, outputWM);
     
Preconditions.checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                             "End-of-window hold %s is beyond end-of-time", 
eowHold);
+        "End-of-window hold %s is beyond end-of-time", eowHold);
     // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we 
want to keep
     // the hold away from the combining function in elementHoldTag.
     // However if !paneIsEmpty then it could make sense  to use the 
elementHoldTag here.
@@ -331,7 +331,7 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
     context.state().access(EXTRA_HOLD_TAG).add(eowHold);
     WindowTracing.trace(
         "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time 
for "
-        + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+            + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
         eowHold, context.key(), context.window(), inputWM, outputWM);
     return eowHold;
   }
@@ -371,33 +371,33 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
     if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) {
       WindowTracing.trace(
           "WatermarkHold.addGarbageCollectionHold: garbage collection hold at 
{} is unnecessary "
-          + "since no allowed lateness for key:{}; window:{}; 
inputWatermark:{}; "
-          + "outputWatermark:{}",
+              + "since no allowed lateness for key:{}; window:{}; 
inputWatermark:{}; "
+              + "outputWatermark:{}",
           gcHold, context.key(), context.window(), inputWM, outputWM);
       return null;
     }
 
     if (paneIsEmpty && context.windowingStrategy().getClosingBehavior()
-                       == ClosingBehavior.FIRE_IF_NON_EMPTY) {
+        == ClosingBehavior.FIRE_IF_NON_EMPTY) {
       WindowTracing.trace(
           "WatermarkHold.addGarbageCollectionHold: garbage collection hold at 
{} is unnecessary "
-          + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; 
inputWatermark:{}; "
-          + "outputWatermark:{}",
+              + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; 
inputWatermark:{}; "
+              + "outputWatermark:{}",
           gcHold, context.key(), context.window(), inputWM, outputWM);
       return null;
     }
 
     Preconditions.checkState(!gcHold.isBefore(inputWM),
-                             "Garbage collection hold %s cannot be before 
input watermark %s",
-                             gcHold, inputWM);
+        "Garbage collection hold %s cannot be before input watermark %s",
+        gcHold, inputWM);
     
Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
-                             "Garbage collection hold %s is beyond 
end-of-time", gcHold);
+        "Garbage collection hold %s is beyond end-of-time", gcHold);
     // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in 
addEndOfWindowHold above.
     context.state().access(EXTRA_HOLD_TAG).add(gcHold);
 
     WindowTracing.trace(
         "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} 
is on time for "
-        + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
+            + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
         gcHold, context.key(), context.window(), inputWM, outputWM);
     return gcHold;
   }
@@ -416,9 +416,9 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
    */
   public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) {
     WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; 
inputWatermark:{}; "
-                        + "outputWatermark:{}",
-                        context.key(), context.window(), 
timerInternals.currentInputWatermarkTime(),
-                        timerInternals.currentOutputWatermarkTime());
+            + "outputWatermark:{}",
+        context.key(), context.window(), 
timerInternals.currentInputWatermarkTime(),
+        timerInternals.currentOutputWatermarkTime());
     StateMerging.mergeWatermarks(context.state(), elementHoldTag, 
context.window());
     // If we had a cheap way to determine if we have an element hold then we 
could
     // avoid adding an unnecessary end-of-window or garbage collection hold.
@@ -435,7 +435,8 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
    */
   public static class OldAndNewHolds {
     public final Instant oldHold;
-    @Nullable public final Instant newHold;
+    @Nullable
+    public final Instant newHold;
 
     public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) {
       this.oldHold = oldHold;
@@ -456,7 +457,7 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
       final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) {
     WindowTracing.debug(
         "WatermarkHold.extractAndRelease: for key:{}; window:{}; 
inputWatermark:{}; "
-        + "outputWatermark:{}",
+            + "outputWatermark:{}",
         context.key(), context.window(), 
timerInternals.currentInputWatermarkTime(),
         timerInternals.currentOutputWatermarkTime());
     final WatermarkHoldState<W> elementHoldState = 
context.state().access(elementHoldTag);

Reply via email to