http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java deleted file mode 100644 index 7404e1b..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java +++ /dev/null @@ -1,536 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.Serializable; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateMerging; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Implements the logic to hold the output watermark for a computation back - * until it has seen all the elements it needs based on the input watermark for the - * computation. - * - * <p>The backend ensures the output watermark can never progress beyond the - * input watermark for a computation. GroupAlsoByWindows computations may add a 'hold' - * to the output watermark in order to prevent it progressing beyond a time within a window. - * The hold will be 'cleared' when the associated pane is emitted. - * - * <p>This class is only intended for use by {@link ReduceFnRunner}. The two evolve together and - * will likely break any other uses. - * - * @param <W> The kind of {@link BoundedWindow} the hold is for. - */ -class WatermarkHold<W extends BoundedWindow> implements Serializable { - /** - * Return tag for state containing the output watermark hold - * used for elements. - */ - public static <W extends BoundedWindow> - StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn( - OutputTimeFn<? super W> outputTimeFn) { - return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal( - StateTags.<W>watermarkStateInternal("hold", outputTimeFn)); - } - - /** - * Tag for state containing end-of-window and garbage collection output watermark holds. - * (We can't piggy-back on the data hold state since the outputTimeFn may be - * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will - * would take the end-of-window time as its element time.) - */ - @VisibleForTesting - public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG = - StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal( - "extra", OutputTimeFns.outputAtEarliestInputTimestamp())); - - private final TimerInternals timerInternals; - private final WindowingStrategy<?, W> windowingStrategy; - private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag; - - public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) { - this.timerInternals = timerInternals; - this.windowingStrategy = windowingStrategy; - this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn()); - } - - /** - * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp - * of the element in {@code context}. We allow the actual hold time to be shifted later by - * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will - * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold - * was placed, or {@literal null} if no hold was placed. - * - * <p>In the following we'll write {@code E} to represent an element's timestamp after passing - * through the window strategy's output time function, {@code IWM} for the local input watermark, - * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection - * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right, - * and we write {@code [ ... ]} to denote a bounded window with implied lower bound. - * - * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness} - * is {@code ZERO}. - * - * <p>Here are the cases we need to handle. They are conceptually considered in the - * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM. - * <ol> - * <li>(Normal) - * <pre> - * | - * [ | E ] - * | - * IWM - * </pre> - * This is, hopefully, the common and happy case. The element is locally on-time and can - * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer - * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's - * timestamp (depending on the output time function). Thus the OWM will not proceed past E - * until the next pane fires. - * - * <li>(Discard - no target window) - * <pre> - * | | - * [ E ] | | - * | | - * GCWM <-getAllowedLateness-> IWM - * </pre> - * The element is very locally late. The window has been garbage collected, thus there - * is no target pane E could be assigned to. We discard E. - * - * <li>(Unobservably late) - * <pre> - * | | - * [ | E | ] - * | | - * OWM IWM - * </pre> - * The element is locally late, however we can still treat this case as for 'Normal' above - * since the IWM has not yet passed the end of the window and the element is ahead of the - * OWM. In effect, we get to 'launder' the locally late element and consider it as locally - * on-time because no downstream computation can observe the difference. - * - * <li>(Maybe late 1) - * <pre> - * | | - * [ | E ] | - * | | - * OWM IWM - * </pre> - * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME} - * pane may have already been emitted. However, if timer firings have been delayed then it - * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element - * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find - * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's - * timestamp. We may however set a garbage collection hold if required. - * - * <li>(Maybe late 2) - * <pre> - * | | - * [ E | | ] - * | | - * OWM IWM - * </pre> - * The end-of-window timer has not yet fired, so this element may still appear in an - * {@code ON_TIME} pane. However the element is too late to contribute to the output - * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an - * end-of-window hold. - * - * <li>(Maybe late 3) - * <pre> - * | | - * [ E | ] | - * | | - * OWM IWM - * </pre> - * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer - * has already fired, or it is about to fire. We can place only the garbage collection hold, - * if required. - * - * <li>(Definitely late) - * <pre> - * | | - * [ E ] | | - * | | - * OWM IWM - * </pre> - * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to - * place an end-of-window hold. We can still place a garbage collection hold if required. - * - * </ol> - */ - @Nullable - public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) { - Instant hold = addElementHold(context); - if (hold == null) { - hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/); - } - return hold; - } - - /** - * Return {@code timestamp}, possibly shifted forward in time according to the window - * strategy's output time function. - */ - private Instant shift(Instant timestamp, W window) { - Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); - checkState(!shifted.isBefore(timestamp), - "OutputTimeFn moved element from %s to earlier time %s for window %s", - timestamp, shifted, window); - checkState(timestamp.isAfter(window.maxTimestamp()) - || !shifted.isAfter(window.maxTimestamp()), - "OutputTimeFn moved element from %s to %s which is beyond end of " - + "window %s", - timestamp, shifted, window); - - return shifted; - } - - /** - * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was - * added (ie the element timestamp plus any forward shift requested by the - * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added. - * The hold is only added if both: - * <ol> - * <li>The backend will be able to respect it. In other words the output watermark cannot - * be ahead of the proposed hold time. - * <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the - * window. In other words the input watermark cannot be ahead of the end of the window. - * </ol> - * The hold ensures the pane which incorporates the element is will not be considered late by - * any downstream computation when it is eventually emitted. - */ - @Nullable - private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context) { - // Give the window function a chance to move the hold timestamp forward to encourage progress. - // (A later hold implies less impediment to the output watermark making progress, which in - // turn encourages end-of-window triggers to fire earlier in following computations.) - Instant elementHold = shift(context.timestamp(), context.window()); - - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - - String which; - boolean tooLate; - // TODO: These case labels could be tightened. - // See the case analysis in addHolds above for the motivation. - if (outputWM != null && elementHold.isBefore(outputWM)) { - which = "too late to effect output watermark"; - tooLate = true; - } else if (context.window().maxTimestamp().isBefore(inputWM)) { - which = "too late for end-of-window timer"; - tooLate = true; - } else { - which = "on time"; - tooLate = false; - checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Element hold %s is beyond end-of-time", elementHold); - context.state().access(elementHoldTag).add(elementHold); - } - WindowTracing.trace( - "WatermarkHold.addHolds: element hold at {} is {} for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - elementHold, which, context.key(), context.window(), inputWM, - outputWM); - - return tooLate ? null : elementHold; - } - - /** - * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required). - * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added. - */ - @Nullable - private Instant addEndOfWindowOrGarbageCollectionHolds( - ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { - Instant hold = addEndOfWindowHold(context, paneIsEmpty); - if (hold == null) { - hold = addGarbageCollectionHold(context, paneIsEmpty); - } - return hold; - } - - /** - * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added - * (ie the end of window time), or {@literal null} if no end of window hold is possible and we - * should fallback to a garbage collection hold. - * - * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner}) - * to clear it. In other words, the input watermark cannot be ahead of the end of window time. - * - * <p>An end-of-window hold is added in two situations: - * <ol> - * <li>An incoming element came in behind the output watermark (so we are too late for placing - * the usual element hold), but it may still be possible to include the element in an - * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will - * not be considered late by any downstream computation. - * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at - * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in - * a pane are processed due to a fired trigger we must set both an end of window timer and an end - * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered - * late by any downstream computation. - * </ol> - */ - @Nullable - private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant eowHold = context.window().maxTimestamp(); - - if (eowHold.isBefore(inputWM)) { - WindowTracing.trace( - "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for " - + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - eowHold, context.key(), context.window(), inputWM, outputWM); - return null; - } - - checkState(outputWM == null || !eowHold.isBefore(outputWM), - "End-of-window hold %s cannot be before output watermark %s", - eowHold, outputWM); - checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "End-of-window hold %s is beyond end-of-time", eowHold); - // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep - // the hold away from the combining function in elementHoldTag. - // However if !paneIsEmpty then it could make sense to use the elementHoldTag here. - // Alas, onMerge is forced to add an end of window or garbage collection hold without - // knowing whether an element hold is already in place (stopping to check is too expensive). - // This it would end up adding an element hold at the end of the window which could - // upset the elementHoldTag combining function. - context.state().access(EXTRA_HOLD_TAG).add(eowHold); - WindowTracing.trace( - "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - eowHold, context.key(), context.window(), inputWM, outputWM); - return eowHold; - } - - /** - * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at - * which the hold was added (ie the end of window time plus allowed lateness), - * or {@literal null} if no hold was added. - * - * <p>We only add the hold if it is distinct from what would be added by - * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness} - * must be non-zero. - * - * <p>A garbage collection hold is added in two situations: - * <ol> - * <li>An incoming element came in behind the output watermark, and was too late for placing - * the usual element hold or an end of window hold. Place the garbage collection hold so that - * we can guarantee when the pane is finally triggered its output will not be dropped due to - * excessive lateness by any downstream computation. - * <li>The {@link WindowingStrategy#getClosingBehavior()} is - * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted - * for all windows which saw at least one element. Again, the garbage collection hold guarantees - * that any empty final pane can be given a timestamp which will not be considered beyond - * allowed lateness by any downstream computation. - * </ol> - * - * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2. - */ - @Nullable - private Instant addGarbageCollectionHold( - ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant eow = context.window().maxTimestamp(); - Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness()); - - if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) { - WindowTracing.trace( - "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " - + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - gcHold, context.key(), context.window(), inputWM, outputWM); - return null; - } - - if (paneIsEmpty && context.windowingStrategy().getClosingBehavior() - == ClosingBehavior.FIRE_IF_NON_EMPTY) { - WindowTracing.trace( - "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " - + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - gcHold, context.key(), context.window(), inputWM, outputWM); - return null; - } - - checkState(!gcHold.isBefore(inputWM), - "Garbage collection hold %s cannot be before input watermark %s", - gcHold, inputWM); - checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Garbage collection hold %s is beyond end-of-time", gcHold); - // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above. - context.state().access(EXTRA_HOLD_TAG).add(gcHold); - - WindowTracing.trace( - "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - gcHold, context.key(), context.window(), inputWM, outputWM); - return gcHold; - } - - /** - * Prefetch watermark holds in preparation for merging. - */ - public void prefetchOnMerge(MergingStateAccessor<?, W> state) { - StateMerging.prefetchWatermarks(state, elementHoldTag); - } - - /** - * Updates the watermark hold when windows merge if it is possible the merged value does - * not equal all of the existing holds. For example, if the new window implies a later - * watermark hold, then earlier holds may be released. - */ - public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) { - WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - context.key(), context.window(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window()); - // If we had a cheap way to determine if we have an element hold then we could - // avoid adding an unnecessary end-of-window or garbage collection hold. - // Simply reading the above merged watermark would impose an additional read for the - // common case that the active window has just one underlying state address window and - // the hold depends on the min of the element timestamps. - // At least one merged window must be non-empty for the merge to have been triggered. - StateMerging.clear(context.state(), EXTRA_HOLD_TAG); - addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/); - } - - /** - * Result of {@link #extractAndRelease}. - */ - public static class OldAndNewHolds { - public final Instant oldHold; - @Nullable - public final Instant newHold; - - public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { - this.oldHold = oldHold; - this.newHold = newHold; - } - } - - /** - * Return (a future for) the earliest hold for {@code context}. Clear all the holds after - * reading, but add/restore an end-of-window or garbage collection hold if required. - * - * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn} - * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late - * elements in the current pane. If there is no such value the timestamp is the end - * of the window. - */ - public ReadableState<OldAndNewHolds> extractAndRelease( - final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) { - WindowTracing.debug( - "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - context.key(), context.window(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag); - final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG); - return new ReadableState<OldAndNewHolds>() { - @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "") - public ReadableState<OldAndNewHolds> readLater() { - elementHoldState.readLater(); - extraHoldState.readLater(); - return this; - } - - @Override - public OldAndNewHolds read() { - // Read both the element and extra holds. - Instant elementHold = elementHoldState.read(); - Instant extraHold = extraHoldState.read(); - Instant oldHold; - // Find the minimum, accounting for null. - if (elementHold == null) { - oldHold = extraHold; - } else if (extraHold == null) { - oldHold = elementHold; - } else if (elementHold.isBefore(extraHold)) { - oldHold = elementHold; - } else { - oldHold = extraHold; - } - if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { - // If no hold (eg because all elements came in behind the output watermark), or - // the hold was for garbage collection, take the end of window as the result. - WindowTracing.debug( - "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " - + "for key:{}; window:{}", - oldHold, context.key(), context.window()); - oldHold = context.window().maxTimestamp(); - } - WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", - context.key(), context.window()); - - // Clear the underlying state to allow the output watermark to progress. - elementHoldState.clear(); - extraHoldState.clear(); - - @Nullable Instant newHold = null; - if (!isFinished) { - // Only need to leave behind an end-of-window or garbage collection hold - // if future elements will be processed. - newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/); - } - - return new OldAndNewHolds(oldHold, newHold); - } - }; - } - - /** - * Clear any remaining holds. - */ - public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) { - WindowTracing.debug( - "WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - context.key(), context.window(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - context.state().access(elementHoldTag).clear(); - context.state().access(EXTRA_HOLD_TAG).clear(); - } - - /** - * Return the current data hold, or null if none. Does not clear. For debugging only. - */ - @Nullable - public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) { - return context.state().access(elementHoldTag).read(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservable.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservable.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservable.java deleted file mode 100644 index 613aa4b..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservable.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.common; - -/** - * An interface for things that allow observing the size in bytes of - * encoded values of type {@code T}. - * - * @param <T> the type of the values being observed - */ -public interface ElementByteSizeObservable<T> { - /** - * Returns whether {@link #registerByteSizeObserver} is cheap enough - * to call for every element, that is, if this - * {@code ElementByteSizeObservable} can calculate the byte size of - * the element to be coded in roughly constant time (or lazily). - */ - public boolean isRegisterByteSizeObserverCheap(T value); - - /** - * Notifies the {@code ElementByteSizeObserver} about the byte size - * of the encoded value using this {@code ElementByteSizeObservable}. - */ - public void registerByteSizeObserver(T value, - ElementByteSizeObserver observer) - throws Exception; -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/PeekingReiterator.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/PeekingReiterator.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/PeekingReiterator.java deleted file mode 100644 index 1e3c17f..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/PeekingReiterator.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util.common; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; - -import java.util.NoSuchElementException; - -/** - * A {@link Reiterator} that supports one-element lookahead during iteration. - * - * @param <T> the type of elements returned by this iterator - */ -public final class PeekingReiterator<T> implements Reiterator<T> { - private T nextElement; - private boolean nextElementComputed; - private final Reiterator<T> iterator; - - public PeekingReiterator(Reiterator<T> iterator) { - this.iterator = checkNotNull(iterator); - } - - PeekingReiterator(PeekingReiterator<T> it) { - this.iterator = checkNotNull(checkNotNull(it).iterator.copy()); - this.nextElement = it.nextElement; - this.nextElementComputed = it.nextElementComputed; - } - - @Override - public boolean hasNext() { - computeNext(); - return nextElementComputed; - } - - @Override - public T next() { - T result = peek(); - nextElementComputed = false; - return result; - } - - /** - * {@inheritDoc} - * - * <p>If {@link #peek} is called, {@code remove} is disallowed until - * {@link #next} has been subsequently called. - */ - @Override - public void remove() { - checkState(!nextElementComputed, - "After peek(), remove() is disallowed until next() is called"); - iterator.remove(); - } - - @Override - public PeekingReiterator<T> copy() { - return new PeekingReiterator<>(this); - } - - /** - * Returns the element that would be returned by {@link #next}, without - * actually consuming the element. - * @throws NoSuchElementException if there is no next element - */ - public T peek() { - computeNext(); - if (!nextElementComputed) { - throw new NoSuchElementException(); - } - return nextElement; - } - - private void computeNext() { - if (nextElementComputed) { - return; - } - if (!iterator.hasNext()) { - return; - } - nextElement = iterator.next(); - nextElementComputed = true; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/package-info.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/package-info.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/package-info.java deleted file mode 100644 index 1ea8af8..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/common/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Defines utilities shared by multiple PipelineRunner implementations. */ -package org.apache.beam.sdk.util.common; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/package-info.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/package-info.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/package-info.java deleted file mode 100644 index b4772f3..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/package-info.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Defines utilities that can be used by Beam runners. */ -package org.apache.beam.sdk.util; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java new file mode 100644 index 0000000..122e60c --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaceForTest; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; + +/** + * Tests for {@link BatchTimerInternals}. + */ +@RunWith(JUnit4.class) +public class BatchTimerInternalsTest { + + private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); + + @Mock + private ReduceFnRunner<?, ?, ?, ?> mockRunner; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testFiringTimers() throws Exception { + BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); + TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + + underTest.setTimer(processingTime1); + underTest.setTimer(processingTime2); + + underTest.advanceProcessingTime(mockRunner, new Instant(20)); + Mockito.verify(mockRunner).onTimer(processingTime1); + Mockito.verifyNoMoreInteractions(mockRunner); + + // Advancing just a little shouldn't refire + underTest.advanceProcessingTime(mockRunner, new Instant(21)); + Mockito.verifyNoMoreInteractions(mockRunner); + + // Adding the timer and advancing a little should refire + underTest.setTimer(processingTime1); + Mockito.verify(mockRunner).onTimer(processingTime1); + underTest.advanceProcessingTime(mockRunner, new Instant(21)); + Mockito.verifyNoMoreInteractions(mockRunner); + + // And advancing the rest of the way should still have the other timer + underTest.advanceProcessingTime(mockRunner, new Instant(30)); + Mockito.verify(mockRunner).onTimer(processingTime2); + Mockito.verifyNoMoreInteractions(mockRunner); + } + + @Test + public void testTimerOrdering() throws Exception { + BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); + TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); + TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); + + underTest.setTimer(processingTime1); + underTest.setTimer(watermarkTime1); + underTest.setTimer(processingTime2); + underTest.setTimer(watermarkTime2); + + underTest.advanceInputWatermark(mockRunner, new Instant(30)); + Mockito.verify(mockRunner).onTimer(watermarkTime1); + Mockito.verify(mockRunner).onTimer(watermarkTime2); + Mockito.verifyNoMoreInteractions(mockRunner); + + underTest.advanceProcessingTime(mockRunner, new Instant(30)); + Mockito.verify(mockRunner).onTimer(processingTime1); + Mockito.verify(mockRunner).onTimer(processingTime2); + Mockito.verifyNoMoreInteractions(mockRunner); + } + + @Test + public void testDeduplicate() throws Exception { + BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); + TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); + TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); + underTest.setTimer(watermarkTime); + underTest.setTimer(watermarkTime); + underTest.setTimer(processingTime); + underTest.setTimer(processingTime); + underTest.advanceProcessingTime(mockRunner, new Instant(20)); + underTest.advanceInputWatermark(mockRunner, new Instant(20)); + + Mockito.verify(mockRunner).onTimer(processingTime); + Mockito.verify(mockRunner).onTimer(watermarkTime); + Mockito.verifyNoMoreInteractions(mockRunner); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java new file mode 100644 index 0000000..d1e0c68 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java @@ -0,0 +1,660 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertThat; + +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.DoFnTester; +import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Sessions; +import org.apache.beam.sdk.transforms.windowing.SlidingWindows; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Properties of {@link GroupAlsoByWindowsDoFn}. + * + * <p>Some properties may not hold of some implementations, due to restrictions on the context + * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not + * support merging windows. + */ +public class GroupAlsoByWindowsProperties { + + /** + * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide + * the appropriate windowing strategy under test. + */ + public interface GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> { + <W extends BoundedWindow> GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> + forStrategy(WindowingStrategy<?, W> strategy, StateInternalsFactory<K> stateInternalsFactory); + } + + /** + * Tests that for empty input and the given {@link WindowingStrategy}, the provided GABW + * implementation produces no output. + * + * <p>The input type is deliberately left as a wildcard, since it is not relevant. + */ + public static <K, InputT, OutputT> void emptyInputEmptyOutput( + GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); + + // This key should never actually be used, though it is eagerly passed to the + // StateInternalsFactory so must be non-null + @SuppressWarnings("unchecked") + K fakeKey = (K) "this key should never be used"; + + DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> result = runGABW( + gabwFactory, + windowingStrategy, + fakeKey, + Collections.<WindowedValue<InputT>>emptyList()); + + assertThat(result.peekOutputElements(), hasSize(0)); + } + + /** + * Tests that for a simple sequence of elements on the same key, the given GABW implementation + * correctly groups them according to fixed windows. + */ + public static void groupsElementsIntoFixedWindows( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); + + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "key", + WindowedValue.of( + "v1", + new Instant(1), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(2), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v3", + new Instant(13), + Arrays.asList(window(10, 20)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); + assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); + assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); + assertThat(item1.getValue().getValue(), contains("v3")); + assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); + } + + /** + * Tests that for a simple sequence of elements on the same key, the given GABW implementation + * correctly groups them into sliding windows. + * + * <p>In the input here, each element occurs in multiple windows. + */ + public static void groupsElementsIntoSlidingWindowsWithMinTimestamp( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of( + SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); + + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "key", + WindowedValue.of( + "v1", + new Instant(5), + Arrays.asList(window(-10, 10), window(0, 20)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(15), + Arrays.asList(window(0, 20), window(10, 30)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(3)); + + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); + assertThat(item0.getValue().getValue(), contains("v1")); + assertThat(item0.getTimestamp(), equalTo(new Instant(5))); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); + assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2")); + // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window + assertThat(item1.getTimestamp(), equalTo(new Instant(10))); + + TimestampedValue<KV<String, Iterable<String>>> item2 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); + assertThat(item2.getValue().getValue(), contains("v2")); + // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window + assertThat(item2.getTimestamp(), equalTo(new Instant(20))); + } + + /** + * Tests that for a simple sequence of elements on the same key, the given GABW implementation + * correctly groups and combines them according to sliding windows. + * + * <p>In the input here, each element occurs in multiple windows. + */ + public static void combinesElementsInSlidingWindows( + GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory, + CombineFn<Long, ?, Long> combineFn) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); + + DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result = + runGABW(gabwFactory, windowingStrategy, "k", + WindowedValue.of( + 1L, + new Instant(5), + Arrays.asList(window(-10, 10), window(0, 20)), + PaneInfo.NO_FIRING), + WindowedValue.of( + 2L, + new Instant(15), + Arrays.asList(window(0, 20), window(10, 30)), + PaneInfo.NO_FIRING), + WindowedValue.of( + 4L, + new Instant(18), + Arrays.asList(window(0, 20), window(10, 30)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(3)); + + TimestampedValue<KV<String, Long>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); + assertThat(item0.getValue().getKey(), equalTo("k")); + assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L)))); + assertThat(item0.getTimestamp(), equalTo(new Instant(5L))); + + TimestampedValue<KV<String, Long>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); + assertThat(item1.getValue().getKey(), equalTo("k")); + assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L)))); + // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window + assertThat(item1.getTimestamp(), equalTo(new Instant(10L))); + + TimestampedValue<KV<String, Long>> item2 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); + assertThat(item2.getValue().getKey(), equalTo("k")); + assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L)))); + // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window + assertThat(item2.getTimestamp(), equalTo(new Instant(20L))); + } + + /** + * Tests that the given GABW implementation correctly groups elements that fall into overlapping + * windows that are not merged. + */ + public static void groupsIntoOverlappingNonmergingWindows( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); + + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "key", + WindowedValue.of( + "v1", + new Instant(1), + Arrays.asList(window(0, 5)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(4), + Arrays.asList(window(1, 5)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v3", + new Instant(4), + Arrays.asList(window(0, 5)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5))); + assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3")); + assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp())); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5))); + assertThat(item1.getValue().getValue(), contains("v2")); + assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp())); + } + + /** + * Tests that the given GABW implementation correctly groups elements into merged sessions. + */ + public static void groupsElementsInMergedSessions( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); + + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "key", + WindowedValue.of( + "v1", + new Instant(0), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(5), + Arrays.asList(window(5, 15)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v3", + new Instant(15), + Arrays.asList(window(15, 25)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); + assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); + assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); + assertThat(item1.getValue().getValue(), contains("v3")); + assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); + } + + /** + * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per + * session window correctly according to the provided {@link CombineFn}. + */ + public static void combinesElementsPerSession( + GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory, + CombineFn<Long, ?, Long> combineFn) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); + + DoFnTester<KV<String, Iterable<WindowedValue<Long>>>, KV<String, Long>> result = + runGABW(gabwFactory, windowingStrategy, "k", + WindowedValue.of( + 1L, + new Instant(0), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + 2L, + new Instant(5), + Arrays.asList(window(5, 15)), + PaneInfo.NO_FIRING), + WindowedValue.of( + 4L, + new Instant(15), + Arrays.asList(window(15, 25)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue<KV<String, Long>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); + assertThat(item0.getValue().getKey(), equalTo("k")); + assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); + assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); + + TimestampedValue<KV<String, Long>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); + assertThat(item1.getValue().getKey(), equalTo("k")); + assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); + assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); + } + + /** + * Tests that for a simple sequence of elements on the same key, the given GABW implementation + * correctly groups them according to fixed windows and also sets the output timestamp + * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}. + */ + public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "key", + WindowedValue.of( + "v1", + new Instant(1), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(2), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v3", + new Instant(13), + Arrays.asList(window(10, 20)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); + assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); + assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); + assertThat(item1.getValue().getValue(), contains("v3")); + assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); + } + + /** + * Tests that for a simple sequence of elements on the same key, the given GABW implementation + * correctly groups them according to fixed windows and also sets the output timestamp + * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}. + */ + public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); + + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "k", + WindowedValue.of( + "v1", + new Instant(1), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(2), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v3", + new Instant(13), + Arrays.asList(window(10, 20)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); + assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); + assertThat(item0.getTimestamp(), equalTo(new Instant(2))); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); + assertThat(item1.getValue().getValue(), contains("v3")); + assertThat(item1.getTimestamp(), equalTo(new Instant(13))); + } + + /** + * Tests that the given GABW implementation correctly groups elements into merged sessions + * with output timestamps at the end of the merged window. + */ + public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "k", + WindowedValue.of( + "v1", + new Instant(0), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(5), + Arrays.asList(window(5, 15)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v3", + new Instant(15), + Arrays.asList(window(15, 25)), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); + assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); + assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); + assertThat(item1.getValue().getValue(), contains("v3")); + assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); + } + + /** + * Tests that the given GABW implementation correctly groups elements into merged sessions + * with output timestamps at the end of the merged window. + */ + public static void groupsElementsInMergedSessionsWithLatestTimestamp( + GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); + + BoundedWindow unmergedWindow = window(15, 25); + DoFnTester<KV<String, Iterable<WindowedValue<String>>>, KV<String, Iterable<String>>> result = + runGABW(gabwFactory, windowingStrategy, "k", + WindowedValue.of( + "v1", + new Instant(0), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v2", + new Instant(5), + Arrays.asList(window(5, 15)), + PaneInfo.NO_FIRING), + WindowedValue.of( + "v3", + new Instant(15), + Arrays.asList(unmergedWindow), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + BoundedWindow mergedWindow = window(0, 15); + TimestampedValue<KV<String, Iterable<String>>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow)); + assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); + assertThat(item0.getTimestamp(), equalTo(new Instant(5))); + + TimestampedValue<KV<String, Iterable<String>>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow)); + assertThat(item1.getValue().getValue(), contains("v3")); + assertThat(item1.getTimestamp(), equalTo(new Instant(15))); + } + + /** + * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per + * session window correctly according to the provided {@link CombineFn}. + */ + public static void combinesElementsPerSessionWithEndOfWindowTimestamp( + GroupAlsoByWindowsDoFnFactory<String, Long, Long> gabwFactory, + CombineFn<Long, ?, Long> combineFn) + throws Exception { + + WindowingStrategy<?, IntervalWindow> windowingStrategy = + WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); + + BoundedWindow secondWindow = window(15, 25); + DoFnTester<?, KV<String, Long>> result = + runGABW(gabwFactory, windowingStrategy, "k", + WindowedValue.of( + 1L, + new Instant(0), + Arrays.asList(window(0, 10)), + PaneInfo.NO_FIRING), + WindowedValue.of( + 2L, + new Instant(5), + Arrays.asList(window(5, 15)), + PaneInfo.NO_FIRING), + WindowedValue.of( + 4L, + new Instant(15), + Arrays.asList(secondWindow), + PaneInfo.NO_FIRING)); + + assertThat(result.peekOutputElements(), hasSize(2)); + + BoundedWindow firstResultWindow = window(0, 15); + TimestampedValue<KV<String, Long>> item0 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow)); + assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); + assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp())); + + TimestampedValue<KV<String, Long>> item1 = + Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow)); + assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); + assertThat(item1.getTimestamp(), + equalTo(secondWindow.maxTimestamp())); + } + + @SafeVarargs + private static <K, InputT, OutputT, W extends BoundedWindow> + DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW( + GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory, + WindowingStrategy<?, W> windowingStrategy, + K key, + WindowedValue<InputT>... values) throws Exception { + return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values)); + } + + private static <K, InputT, OutputT, W extends BoundedWindow> + DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> runGABW( + GroupAlsoByWindowsDoFnFactory<K, InputT, OutputT> gabwFactory, + WindowingStrategy<?, W> windowingStrategy, + K key, + Collection<WindowedValue<InputT>> values) throws Exception { + + final StateInternalsFactory<K> stateInternalsCache = new CachingStateInternalsFactory<K>(); + + DoFnTester<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> tester = + DoFnTester.of(gabwFactory.forStrategy(windowingStrategy, stateInternalsCache)); + + // Though we use a DoFnTester, the function itself is instantiated directly by the + // runner and should not be serialized; it may not even be serializable. + tester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE); + tester.startBundle(); + tester.processElement(KV.<K, Iterable<WindowedValue<InputT>>>of(key, values)); + tester.finishBundle(); + + // Sanity check for corruption + for (KV<K, OutputT> elem : tester.peekOutputElements()) { + assertThat(elem.getKey(), equalTo(key)); + } + + return tester; + } + + private static BoundedWindow window(long start, long end) { + return new IntervalWindow(new Instant(start), new Instant(end)); + } + + private static final class CachingStateInternalsFactory<K> implements StateInternalsFactory<K> { + private final LoadingCache<K, StateInternals<K>> stateInternalsCache; + + private CachingStateInternalsFactory() { + this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader<K>()); + } + + @Override + @SuppressWarnings("unchecked") + public StateInternals<K> stateInternalsForKey(K key) { + try { + return stateInternalsCache.get(key); + } catch (Exception exc) { + throw new RuntimeException(exc); + } + } + } + + private static class StateInternalsLoader<K> extends CacheLoader<K, StateInternals<K>> { + @Override + public StateInternals<K> load(K key) throws Exception { + return InMemoryStateInternals.forKey(key); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java new file mode 100644 index 0000000..1fad1fb --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFnTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import org.apache.beam.runners.core.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link GroupAlsoByWindowsViaOutputBufferDoFn}. + */ +@RunWith(JUnit4.class) +public class GroupAlsoByWindowsViaOutputBufferDoFnTest { + + private class BufferingGABWViaOutputBufferDoFnFactory<K, InputT> + implements GroupAlsoByWindowsDoFnFactory<K, InputT, Iterable<InputT>> { + + private final Coder<InputT> inputCoder; + + public BufferingGABWViaOutputBufferDoFnFactory(Coder<InputT> inputCoder) { + this.inputCoder = inputCoder; + } + + @Override + public <W extends BoundedWindow> + GroupAlsoByWindowsDoFn<K, InputT, Iterable<InputT>, W> forStrategy( + WindowingStrategy<?, W> windowingStrategy, + StateInternalsFactory<K> stateInternalsFactory) { + return new GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, Iterable<InputT>, W>( + windowingStrategy, + stateInternalsFactory, + SystemReduceFn.<K, InputT, W>buffering(inputCoder)); + } + } + + @Test + public void testEmptyInputEmptyOutput() throws Exception { + GroupAlsoByWindowsProperties.emptyInputEmptyOutput( + new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsElementsIntoFixedWindows() throws Exception { + GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsElementsIntoSlidingWindows() throws Exception { + GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsIntoOverlappingNonmergingWindows() throws Exception { + GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsIntoSessions() throws Exception { + GroupAlsoByWindowsProperties.groupsElementsInMergedSessions( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception { + GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception { + GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception { + GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } + + @Test + public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception { + GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp( + new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java new file mode 100644 index 0000000..1cf05b6 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunnerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.util.Arrays; +import org.apache.beam.runners.core.LateDataDroppingDoFnRunner.LateDataFilter; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** + * Unit tests for {@link LateDataDroppingDoFnRunner}. + */ +@RunWith(JUnit4.class) +public class LateDataDroppingDoFnRunnerTest { + private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10)); + + @Mock private TimerInternals mockTimerInternals; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testLateDataFilter() throws Exception { + when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15L)); + + InMemoryLongSumAggregator droppedDueToLateness = + new InMemoryLongSumAggregator("droppedDueToLateness"); + LateDataFilter lateDataFilter = new LateDataFilter( + WindowingStrategy.of(WINDOW_FN), mockTimerInternals, droppedDueToLateness); + + Iterable<WindowedValue<Integer>> actual = lateDataFilter.filter( + "a", + ImmutableList.of( + createDatum(13, 13L), + createDatum(5, 5L), // late element, earlier than 4L. + createDatum(16, 16L), + createDatum(18, 18L))); + + Iterable<WindowedValue<Integer>> expected = ImmutableList.of( + createDatum(13, 13L), + createDatum(16, 16L), + createDatum(18, 18L)); + assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class))); + assertEquals(1, droppedDueToLateness.sum); + } + + private <T> WindowedValue<T> createDatum(T element, long timestampMillis) { + Instant timestamp = new Instant(timestampMillis); + return WindowedValue.of( + element, + timestamp, + Arrays.asList(WINDOW_FN.assignWindow(timestamp)), + PaneInfo.NO_FIRING); + } + + private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> { + private final String name; + private long sum = 0; + + public InMemoryLongSumAggregator(String name) { + this.name = name; + } + + @Override + public void addValue(Long value) { + sum += value; + } + + @Override + public String getName() { + return name; + } + + @Override + public CombineFn<Long, ?, Long> getCombineFn() { + return new Sum.SumLongFn(); + } + } +}