Repository: incubator-beam Updated Branches: refs/heads/master 00f608f05 -> 49d82baf1
Basic non-null checks Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1c89a1b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1c89a1b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1c89a1b3 Branch: refs/heads/master Commit: 1c89a1b3ac0ff296003ae443e6b4763f501b8ada Parents: 00f608f Author: Mark Shields <[email protected]> Authored: Wed Mar 2 20:45:59 2016 -0800 Committer: Mark Shields <[email protected]> Committed: Fri Mar 25 12:28:48 2016 -0700 ---------------------------------------------------------------------- .../cloud/dataflow/sdk/util/ReduceFnRunner.java | 51 +++++++++++++++++- .../cloud/dataflow/sdk/util/TriggerRunner.java | 2 + .../cloud/dataflow/sdk/util/WatermarkHold.java | 55 +++++++++++++++----- 3 files changed, 92 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 2e2d1f6..f1d4582 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -499,6 +499,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { directContext.timestamp(), directContext.timers(), directContext.state()); + + // At this point, if triggerRunner.shouldFire before the processValue then + // triggerRunner.shouldFire after the processValue. In other words adding values + // cannot take a trigger state from firing to non-firing. + // (We don't actually assert this since it is too slow.) } return windows; @@ -568,6 +573,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain() && timer.getTimestamp().equals(window.maxTimestamp()); if (isEndOfWindow) { + // If the window strategy trigger includes a watermark trigger then at this point + // there should be no data holds, either because we'd already cleared them on an + // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate. + // We could assert this but it is very expensive. + // Since we are processing an on-time firing we should schedule the garbage collection // timer. (If getAllowedLateness is zero then the timer event will be considered a // cleanup event and handled by the above). @@ -715,8 +725,11 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { ReduceFn<K, InputT, OutputT, W>.Context renamedContext, boolean isFinished) throws Exception { + Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + // Prefetch necessary states - ReadableState<Instant> outputTimestampFuture = + ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture = watermarkHold.extractAndRelease(renamedContext, isFinished).readLater(); ReadableState<PaneInfo> paneFuture = paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater(); @@ -729,7 +742,41 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> { // Calculate the pane info. final PaneInfo pane = paneFuture.read(); // Extract the window hold, and as a side effect clear it. - final Instant outputTimestamp = outputTimestampFuture.read(); + + WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read(); + final Instant outputTimestamp = pair.oldHold; + @Nullable Instant newHold = pair.newHold; + + if (newHold != null && inputWM != null) { + // We can't be finished yet. + Preconditions.checkState( + !isFinished, "new hold at %s but finished %s", newHold, directContext.window()); + // The hold cannot be behind the input watermark. + Preconditions.checkState( + !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM); + if (newHold.isAfter(directContext.window().maxTimestamp())) { + // The hold must be for garbage collection, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual( + directContext.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness())), + "new hold %s should be at garbage collection for window %s plus %s", + newHold, + directContext.window(), + windowingStrategy.getAllowedLateness()); + } else { + // The hold must be for the end-of-window, which can't have happened yet. + Preconditions.checkState( + newHold.isEqual(directContext.window().maxTimestamp()), + "new hold %s should be at end of window %s", + newHold, + directContext.window()); + Preconditions.checkState( + !isEndOfWindow, + "new hold at %s for %s but this is the watermark trigger", + newHold, + directContext.window()); + } + } // Only emit a pane if it has data or empty panes are observable. if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java index dcfd035..8fc4981 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/TriggerRunner.java @@ -172,6 +172,8 @@ public class TriggerRunner<W extends BoundedWindow> { } public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. FinishedTriggersBitSet finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); Trigger<W>.TriggerContext context = contextFactory.base(window, timers, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1c89a1b3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java index d537ddb..31e36c5 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/WatermarkHold.java @@ -228,6 +228,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); // Only add the hold if we can be sure: // - the backend will be able to respect it @@ -287,6 +288,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { // by the end of window (ie the end of window is at or ahead of the input watermark). Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + String which; boolean tooLate; Instant eowHold = context.window().maxTimestamp(); @@ -329,6 +332,8 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { Instant gcHold = context.window().maxTimestamp().plus(windowingStrategy.getAllowedLateness()); Instant outputWM = timerInternals.currentOutputWatermarkTime(); Instant inputWM = timerInternals.currentInputWatermarkTime(); + Preconditions.checkNotNull(inputWM); + WindowTracing.trace( "WatermarkHold.addGarbageCollectionHold: garbage collection at {} hold for " + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -369,6 +374,19 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { } /** + * Result of {@link #extractAndRelease}. + */ + public static class OldAndNewHolds { + public final Instant oldHold; + @Nullable public final Instant newHold; + + public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { + this.oldHold = oldHold; + this.newHold = newHold; + } + } + + /** * Return (a future for) the earliest hold for {@code context}. Clear all the holds after * reading, but add/restore an end-of-window or garbage collection hold if required. * @@ -377,7 +395,7 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { * elements in the current pane. If there is no such value the timestamp is the end * of the window. */ - public ReadableState<Instant> extractAndRelease( + public ReadableState<OldAndNewHolds> extractAndRelease( final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) { WindowTracing.debug( "extractAndRelease: for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", @@ -385,38 +403,38 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { timerInternals.currentOutputWatermarkTime()); final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag); final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG); - return new ReadableState<Instant>() { + return new ReadableState<OldAndNewHolds>() { @Override - public ReadableState<Instant> readLater() { + public ReadableState<OldAndNewHolds> readLater() { elementHoldState.readLater(); extraHoldState.readLater(); return this; } @Override - public Instant read() { + public OldAndNewHolds read() { // Read both the element and extra holds. Instant elementHold = elementHoldState.read(); Instant extraHold = extraHoldState.read(); - Instant hold; + Instant oldHold; // Find the minimum, accounting for null. if (elementHold == null) { - hold = extraHold; + oldHold = extraHold; } else if (extraHold == null) { - hold = elementHold; + oldHold = elementHold; } else if (elementHold.isBefore(extraHold)) { - hold = elementHold; + oldHold = elementHold; } else { - hold = extraHold; + oldHold = extraHold; } - if (hold == null || hold.isAfter(context.window().maxTimestamp())) { + if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { // If no hold (eg because all elements came in behind the output watermark), or // the hold was for garbage collection, take the end of window as the result. WindowTracing.debug( "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " + "for key:{}; window:{}", - hold, context.key(), context.window()); - hold = context.window().maxTimestamp(); + oldHold, context.key(), context.window()); + oldHold = context.window().maxTimestamp(); } WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", context.key(), context.window()); @@ -425,13 +443,14 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { elementHoldState.clear(); extraHoldState.clear(); + @Nullable Instant newHold = null; if (!isFinished) { // Only need to leave behind an end-of-window or garbage collection hold // if future elements will be processed. - addEndOfWindowOrGarbageCollectionHolds(context); + newHold = addEndOfWindowOrGarbageCollectionHolds(context); } - return hold; + return new OldAndNewHolds(oldHold, newHold); } }; } @@ -447,4 +466,12 @@ class WatermarkHold<W extends BoundedWindow> implements Serializable { context.state().access(elementHoldTag).clear(); context.state().access(EXTRA_HOLD_TAG).clear(); } + + /** + * Return the current data hold, or null if none. Does not clear. For debugging only. + */ + @Nullable + public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) { + return context.state().access(elementHoldTag).read(); + } }
