Repository: incubator-beam Updated Branches: refs/heads/master e9c0d8d05 -> c772f4295
Forward port changes to GC holds Forward port changes to BEAM/pull/391 from DataflowJavaSDK/pull/289. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0f78912b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0f78912b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0f78912b Branch: refs/heads/master Commit: 0f78912b23e827327527adf9764134daccb60ad4 Parents: e9c0d8d Author: Mark Shields <[email protected]> Authored: Tue May 31 14:44:20 2016 -0700 Committer: bchambers <[email protected]> Committed: Wed Jun 1 10:50:59 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/util/ReduceFnRunner.java | 5 +- .../org/apache/beam/sdk/util/WatermarkHold.java | 51 ++++++++++---------- 2 files changed, 29 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f78912b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 889ac6f..34208da 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -606,7 +606,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // and the watermark has passed the end of the window. @Nullable Instant newHold = onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow); - Preconditions.checkState(newHold == null); + Preconditions.checkState(newHold == null, + "Hold placed at %s despite isFinished being true.", newHold); } // Cleanup flavor B: Clear all the remaining state for this window since we'll never @@ -691,7 +692,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // // But(!) for backwards compatibility we must allow a pipeline to be updated from // an sdk version <= 1.3. In that case it is possible we have an end-of-window or - // garbage collection holds keyed by the current window (reached via directContext) rather + // garbage collection hold keyed by the current window (reached via directContext) rather // than the state address window (reached via renamedContext). // However this can only happen if: // - We have merging windows. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0f78912b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java index ad842b5..14ec082 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java @@ -206,13 +206,13 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { private Instant shift(Instant timestamp, W window) { Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); Preconditions.checkState(!shifted.isBefore(timestamp), - "OutputTimeFn moved element from %s to earlier time %s for window %s", - timestamp, shifted, window); + "OutputTimeFn moved element from %s to earlier time %s for window %s", + timestamp, shifted, window); Preconditions.checkState(timestamp.isAfter(window.maxTimestamp()) - || !shifted.isAfter(window.maxTimestamp()), - "OutputTimeFn moved element from %s to %s which is beyond end of " - + "window %s", - timestamp, shifted, window); + || !shifted.isAfter(window.maxTimestamp()), + "OutputTimeFn moved element from %s to %s which is beyond end of " + + "window %s", + timestamp, shifted, window); return shifted; } @@ -311,16 +311,16 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { if (eowHold.isBefore(inputWM)) { WindowTracing.trace( "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for " - + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", eowHold, context.key(), context.window(), inputWM, outputWM); return null; } Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM), - "End-of-window hold %s cannot be before output watermark %s", - eowHold, outputWM); + "End-of-window hold %s cannot be before output watermark %s", + eowHold, outputWM); Preconditions.checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "End-of-window hold %s is beyond end-of-time", eowHold); + "End-of-window hold %s is beyond end-of-time", eowHold); // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep // the hold away from the combining function in elementHoldTag. // However if !paneIsEmpty then it could make sense to use the elementHoldTag here. @@ -331,7 +331,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { context.state().access(EXTRA_HOLD_TAG).add(eowHold); WindowTracing.trace( "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", eowHold, context.key(), context.window(), inputWM, outputWM); return eowHold; } @@ -371,33 +371,33 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) { WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " - + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", + + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; " + + "outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM); return null; } if (paneIsEmpty && context.windowingStrategy().getClosingBehavior() - == ClosingBehavior.FIRE_IF_NON_EMPTY) { + == ClosingBehavior.FIRE_IF_NON_EMPTY) { WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " - + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", + + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; " + + "outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM); return null; } Preconditions.checkState(!gcHold.isBefore(inputWM), - "Garbage collection hold %s cannot be before input watermark %s", - gcHold, inputWM); + "Garbage collection hold %s cannot be before input watermark %s", + gcHold, inputWM); Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Garbage collection hold %s is beyond end-of-time", gcHold); + "Garbage collection hold %s is beyond end-of-time", gcHold); // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above. context.state().access(EXTRA_HOLD_TAG).add(gcHold); WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", gcHold, context.key(), context.window(), inputWM, outputWM); return gcHold; } @@ -416,9 +416,9 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { */ public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) { WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - context.key(), context.window(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); + + "outputWatermark:{}", + context.key(), context.window(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window()); // If we had a cheap way to determine if we have an element hold then we could // avoid adding an unnecessary end-of-window or garbage collection hold. @@ -435,7 +435,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { */ public static class OldAndNewHolds { public final Instant oldHold; - @Nullable public final Instant newHold; + @Nullable + public final Instant newHold; public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { this.oldHold = oldHold; @@ -456,7 +457,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) { WindowTracing.debug( "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", + + "outputWatermark:{}", context.key(), context.window(), timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag);
