http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java new file mode 100644 index 0000000..3948d9e --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SystemReduceFn.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + + +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; +import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.AppliedCombineFn; +import org.apache.beam.sdk.util.state.AccumulatorCombiningState; +import org.apache.beam.sdk.util.state.BagState; +import org.apache.beam.sdk.util.state.CombiningState; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; + +/** + * {@link ReduceFn} implementing the default reduction behaviors of {@link GroupByKey}. + * + * @param <K> The type of key being processed. + * @param <InputT> The type of values associated with the key. + * @param <OutputT> The output type that will be produced for each key. + * @param <W> The type of windows this operates on. + */ +public abstract class SystemReduceFn<K, InputT, AccumT, OutputT, W extends BoundedWindow> + extends ReduceFn<K, InputT, OutputT, W> { + private static final String BUFFER_NAME = "buf"; + + /** + * Create a factory that produces {@link SystemReduceFn} instances that that buffer all of the + * input values in persistent state and produces an {@code Iterable<T>}. + */ + public static <K, T, W extends BoundedWindow> SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W> + buffering(final Coder<T> inputCoder) { + final StateTag<Object, BagState<T>> bufferTag = + StateTags.makeSystemTagInternal(StateTags.bag(BUFFER_NAME, inputCoder)); + return new SystemReduceFn<K, T, Iterable<T>, Iterable<T>, W>(bufferTag) { + @Override + public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception { + StateMerging.prefetchBags(state, bufferTag); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + StateMerging.mergeBags(c.state(), bufferTag); + } + }; + } + + /** + * Create a factory that produces {@link SystemReduceFn} instances that combine all of the input + * values using a {@link CombineFn}. + */ + public static <K, InputT, AccumT, OutputT, W extends BoundedWindow> SystemReduceFn<K, InputT, + AccumT, OutputT, W> + combining( + final Coder<K> keyCoder, final AppliedCombineFn<K, InputT, AccumT, OutputT> combineFn) { + final StateTag<K, AccumulatorCombiningState<InputT, AccumT, OutputT>> bufferTag; + if (combineFn.getFn() instanceof KeyedCombineFnWithContext) { + bufferTag = StateTags.makeSystemTagInternal( + StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValueWithContext( + BUFFER_NAME, combineFn.getAccumulatorCoder(), + (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) combineFn.getFn())); + + } else { + bufferTag = StateTags.makeSystemTagInternal( + StateTags.<K, InputT, AccumT, OutputT>keyedCombiningValue( + BUFFER_NAME, combineFn.getAccumulatorCoder(), + (KeyedCombineFn<K, InputT, AccumT, OutputT>) combineFn.getFn())); + } + return new SystemReduceFn<K, InputT, AccumT, OutputT, W>(bufferTag) { + @Override + public void prefetchOnMerge(MergingStateAccessor<K, W> state) throws Exception { + StateMerging.prefetchCombiningValues(state, bufferTag); + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + StateMerging.mergeCombiningValues(c.state(), bufferTag); + } + }; + } + + private StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag; + + public SystemReduceFn( + StateTag<? super K, ? extends CombiningState<InputT, OutputT>> bufferTag) { + this.bufferTag = bufferTag; + } + + @Override + public void processValue(ProcessValueContext c) throws Exception { + c.state().access(bufferTag).add(c.value()); + } + + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchOnTrigger(StateAccessor<K> state) { + state.access(bufferTag).readLater(); + } + + @Override + public void onTrigger(OnTriggerContext c) throws Exception { + c.output(c.state().access(bufferTag).read()); + } + + @Override + public void clearState(Context c) throws Exception { + c.state().access(bufferTag).clear(); + } + + @Override + public ReadableState<Boolean> isEmpty(StateAccessor<K> state) { + return state.access(bufferTag).isEmpty(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java new file mode 100644 index 0000000..8d0f322 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.BitSet; +import java.util.Collection; +import java.util.Map; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.util.BitSetCoder; +import org.apache.beam.sdk.util.ExecutableTrigger; +import org.apache.beam.sdk.util.FinishedTriggers; +import org.apache.beam.sdk.util.FinishedTriggersBitSet; +import org.apache.beam.sdk.util.Timers; +import org.apache.beam.sdk.util.TriggerContextFactory; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.StateAccessor; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.ValueState; +import org.joda.time.Instant; + +/** + * Executes a trigger while managing persistence of information about which subtriggers are + * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger. + * + * <p>Specifically, the responsibilities are: + * + * <ul> + * <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by + * constructing the appropriate trigger contexts.</li> + * <li>Committing a record of which subtriggers are finished to persistent state.</li> + * <li>Restoring the record of which subtriggers are finished from persistent state.</li> + * <li>Clearing out the persisted finished set when a caller indicates + * (via {#link #clearFinished}) that it is no longer needed.</li> + * </ul> + * + * <p>These responsibilities are intertwined: trigger contexts include mutable information about + * which subtriggers are finished. This class provides the information when building the contexts + * and commits the information when the method of the {@link ExecutableTrigger} returns. + * + * @param <W> The kind of windows being processed. + */ +public class TriggerRunner<W extends BoundedWindow> { + @VisibleForTesting + static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG = + StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); + + private final ExecutableTrigger rootTrigger; + private final TriggerContextFactory<W> contextFactory; + + public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) { + checkState(rootTrigger.getTriggerIndex() == 0); + this.rootTrigger = rootTrigger; + this.contextFactory = contextFactory; + } + + private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) { + if (!isFinishedSetNeeded()) { + // If no trigger in the tree will ever have finished bits, then we don't need to read them. + // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not + // finished) for each trigger in the tree. + return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); + } + + BitSet bitSet = state.read(); + return bitSet == null + ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) + : FinishedTriggersBitSet.fromBitSet(bitSet); + } + + + private void clearFinishedBits(ValueState<BitSet> state) { + if (!isFinishedSetNeeded()) { + // Nothing to clear. + return; + } + state.clear(); + } + + /** Return true if the trigger is closed in the window corresponding to the specified state. */ + public boolean isClosed(StateAccessor<?> state) { + return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchForValue(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnElement( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchOnFire(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchShouldFire(W window, StateAccessor<?> state) { + if (isFinishedSetNeeded()) { + state.access(FINISHED_BITS_TAG).readLater(); + } + rootTrigger.getSpec().prefetchShouldFire( + contextFactory.createStateAccessor(window, rootTrigger)); + } + + /** + * Run the trigger logic to deal with a new value. + */ + public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state) + throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext( + window, timers, timestamp, rootTrigger, finishedSet); + rootTrigger.invokeOnElement(triggerContext); + persistFinishedSet(state, finishedSet); + } + + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "prefetch side effect") + public void prefetchForMerge( + W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) { + if (isFinishedSetNeeded()) { + for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) { + value.readLater(); + } + } + rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor( + window, mergingWindows, rootTrigger)); + } + + /** + * Run the trigger merging logic as part of executing the specified merge. + */ + public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception { + // Clone so that we can detect changes and so that changes here don't pollute merging. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + + // And read the finished bits in each merging window. + ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder(); + for (Map.Entry<W, ValueState<BitSet>> entry : + state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) { + // Don't need to clone these, since the trigger context doesn't allow modification + builder.put(entry.getKey(), readFinishedBits(entry.getValue())); + // Clear the underlying finished bits. + clearFinishedBits(entry.getValue()); + } + ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build(); + + Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext( + window, timers, rootTrigger, finishedSet, mergingFinishedSets); + + // Run the merge from the trigger + rootTrigger.invokeOnMerge(mergeContext); + + persistFinishedSet(state, finishedSet); + } + + public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception { + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + return rootTrigger.invokeShouldFire(context); + } + + public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception { + // shouldFire should be false. + // However it is too expensive to assert. + FinishedTriggersBitSet finishedSet = + readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); + Trigger.TriggerContext context = contextFactory.base(window, timers, + rootTrigger, finishedSet); + rootTrigger.invokeOnFire(context); + persistFinishedSet(state, finishedSet); + } + + private void persistFinishedSet( + StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) { + if (!isFinishedSetNeeded()) { + return; + } + + ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG); + if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { + if (modifiedFinishedSet.getBitSet().isEmpty()) { + finishedSetState.clear(); + } else { + finishedSetState.write(modifiedFinishedSet.getBitSet()); + } + } + } + + /** + * Clear the finished bits. + */ + public void clearFinished(StateAccessor<?> state) { + clearFinishedBits(state.access(FINISHED_BITS_TAG)); + } + + /** + * Clear the state used for executing triggers, but leave the finished set to indicate + * the window is closed. + */ + public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception { + // Don't need to clone, because we'll be clearing the finished bits anyways. + FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)); + rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet)); + } + + private boolean isFinishedSetNeeded() { + // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the + // lookup. Right now, we special case this for the DefaultTrigger. + return !(rootTrigger.getSpec() instanceof DefaultTrigger); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java new file mode 100644 index 0000000..7d0b608 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -0,0 +1,539 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.annotations.VisibleForTesting; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.Serializable; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; +import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.MergingStateAccessor; +import org.apache.beam.sdk.util.state.ReadableState; +import org.apache.beam.sdk.util.state.StateMerging; +import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.StateTags; +import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.joda.time.Duration; +import org.joda.time.Instant; + +/** + * Implements the logic to hold the output watermark for a computation back + * until it has seen all the elements it needs based on the input watermark for the + * computation. + * + * <p>The backend ensures the output watermark can never progress beyond the + * input watermark for a computation. GroupAlsoByWindows computations may add a 'hold' + * to the output watermark in order to prevent it progressing beyond a time within a window. + * The hold will be 'cleared' when the associated pane is emitted. + * + * <p>This class is only intended for use by {@link ReduceFnRunner}. The two evolve together and + * will likely break any other uses. + * + * @param <W> The kind of {@link BoundedWindow} the hold is for. + */ +class WatermarkHold<W extends BoundedWindow> implements Serializable { + /** + * Return tag for state containing the output watermark hold + * used for elements. + */ + public static <W extends BoundedWindow> + StateTag<Object, WatermarkHoldState<W>> watermarkHoldTagForOutputTimeFn( + OutputTimeFn<? super W> outputTimeFn) { + return StateTags.<Object, WatermarkHoldState<W>>makeSystemTagInternal( + StateTags.<W>watermarkStateInternal("hold", outputTimeFn)); + } + + /** + * Tag for state containing end-of-window and garbage collection output watermark holds. + * (We can't piggy-back on the data hold state since the outputTimeFn may be + * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will + * would take the end-of-window time as its element time.) + */ + @VisibleForTesting + public static final StateTag<Object, WatermarkHoldState<BoundedWindow>> EXTRA_HOLD_TAG = + StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal( + "extra", OutputTimeFns.outputAtEarliestInputTimestamp())); + + private final TimerInternals timerInternals; + private final WindowingStrategy<?, W> windowingStrategy; + private final StateTag<Object, WatermarkHoldState<W>> elementHoldTag; + + public WatermarkHold(TimerInternals timerInternals, WindowingStrategy<?, W> windowingStrategy) { + this.timerInternals = timerInternals; + this.windowingStrategy = windowingStrategy; + this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn()); + } + + /** + * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp + * of the element in {@code context}. We allow the actual hold time to be shifted later by + * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will + * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold + * was placed, or {@literal null} if no hold was placed. + * + * <p>In the following we'll write {@code E} to represent an element's timestamp after passing + * through the window strategy's output time function, {@code IWM} for the local input watermark, + * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection + * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right, + * and we write {@code [ ... ]} to denote a bounded window with implied lower bound. + * + * <p>Note that the GCWM will be the same as the IWM if {@code getAllowedLateness} + * is {@code ZERO}. + * + * <p>Here are the cases we need to handle. They are conceptually considered in the + * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM. + * <ol> + * <li>(Normal) + * <pre> + * | + * [ | E ] + * | + * IWM + * </pre> + * This is, hopefully, the common and happy case. The element is locally on-time and can + * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer + * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's + * timestamp (depending on the output time function). Thus the OWM will not proceed past E + * until the next pane fires. + * + * <li>(Discard - no target window) + * <pre> + * | | + * [ E ] | | + * | | + * GCWM <-getAllowedLateness-> IWM + * </pre> + * The element is very locally late. The window has been garbage collected, thus there + * is no target pane E could be assigned to. We discard E. + * + * <li>(Unobservably late) + * <pre> + * | | + * [ | E | ] + * | | + * OWM IWM + * </pre> + * The element is locally late, however we can still treat this case as for 'Normal' above + * since the IWM has not yet passed the end of the window and the element is ahead of the + * OWM. In effect, we get to 'launder' the locally late element and consider it as locally + * on-time because no downstream computation can observe the difference. + * + * <li>(Maybe late 1) + * <pre> + * | | + * [ | E ] | + * | | + * OWM IWM + * </pre> + * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME} + * pane may have already been emitted. However, if timer firings have been delayed then it + * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element + * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find + * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's + * timestamp. We may however set a garbage collection hold if required. + * + * <li>(Maybe late 2) + * <pre> + * | | + * [ E | | ] + * | | + * OWM IWM + * </pre> + * The end-of-window timer has not yet fired, so this element may still appear in an + * {@code ON_TIME} pane. However the element is too late to contribute to the output + * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an + * end-of-window hold. + * + * <li>(Maybe late 3) + * <pre> + * | | + * [ E | ] | + * | | + * OWM IWM + * </pre> + * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer + * has already fired, or it is about to fire. We can place only the garbage collection hold, + * if required. + * + * <li>(Definitely late) + * <pre> + * | | + * [ E ] | | + * | | + * OWM IWM + * </pre> + * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to + * place an end-of-window hold. We can still place a garbage collection hold if required. + * + * </ol> + */ + @Nullable + public Instant addHolds(ReduceFn<?, ?, ?, W>.ProcessValueContext context) { + Instant hold = addElementHold(context); + if (hold == null) { + hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/); + } + return hold; + } + + /** + * Return {@code timestamp}, possibly shifted forward in time according to the window + * strategy's output time function. + */ + private Instant shift(Instant timestamp, W window) { + Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); + checkState(!shifted.isBefore(timestamp), + "OutputTimeFn moved element from %s to earlier time %s for window %s", + timestamp, shifted, window); + checkState(timestamp.isAfter(window.maxTimestamp()) + || !shifted.isAfter(window.maxTimestamp()), + "OutputTimeFn moved element from %s to %s which is beyond end of " + + "window %s", + timestamp, shifted, window); + + return shifted; + } + + /** + * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was + * added (ie the element timestamp plus any forward shift requested by the + * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added. + * The hold is only added if both: + * <ol> + * <li>The backend will be able to respect it. In other words the output watermark cannot + * be ahead of the proposed hold time. + * <li>A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the + * window. In other words the input watermark cannot be ahead of the end of the window. + * </ol> + * The hold ensures the pane which incorporates the element is will not be considered late by + * any downstream computation when it is eventually emitted. + */ + @Nullable + private Instant addElementHold(ReduceFn<?, ?, ?, W>.ProcessValueContext context) { + // Give the window function a chance to move the hold timestamp forward to encourage progress. + // (A later hold implies less impediment to the output watermark making progress, which in + // turn encourages end-of-window triggers to fire earlier in following computations.) + Instant elementHold = shift(context.timestamp(), context.window()); + + Instant outputWM = timerInternals.currentOutputWatermarkTime(); + Instant inputWM = timerInternals.currentInputWatermarkTime(); + + String which; + boolean tooLate; + // TODO: These case labels could be tightened. + // See the case analysis in addHolds above for the motivation. + if (outputWM != null && elementHold.isBefore(outputWM)) { + which = "too late to effect output watermark"; + tooLate = true; + } else if (context.window().maxTimestamp().isBefore(inputWM)) { + which = "too late for end-of-window timer"; + tooLate = true; + } else { + which = "on time"; + tooLate = false; + checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Element hold %s is beyond end-of-time", elementHold); + context.state().access(elementHoldTag).add(elementHold); + } + WindowTracing.trace( + "WatermarkHold.addHolds: element hold at {} is {} for " + + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + elementHold, which, context.key(), context.window(), inputWM, + outputWM); + + return tooLate ? null : elementHold; + } + + /** + * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required). + * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added. + */ + @Nullable + private Instant addEndOfWindowOrGarbageCollectionHolds( + ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { + Instant hold = addEndOfWindowHold(context, paneIsEmpty); + if (hold == null) { + hold = addGarbageCollectionHold(context, paneIsEmpty); + } + return hold; + } + + /** + * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added + * (ie the end of window time), or {@literal null} if no end of window hold is possible and we + * should fallback to a garbage collection hold. + * + * <p>We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner}) + * to clear it. In other words, the input watermark cannot be ahead of the end of window time. + * + * <p>An end-of-window hold is added in two situations: + * <ol> + * <li>An incoming element came in behind the output watermark (so we are too late for placing + * the usual element hold), but it may still be possible to include the element in an + * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will + * not be considered late by any downstream computation. + * <li>We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at + * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in + * a pane are processed due to a fired trigger we must set both an end of window timer and an end + * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered + * late by any downstream computation. + * </ol> + */ + @Nullable + private Instant addEndOfWindowHold(ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { + Instant outputWM = timerInternals.currentOutputWatermarkTime(); + Instant inputWM = timerInternals.currentInputWatermarkTime(); + Instant eowHold = context.window().maxTimestamp(); + + if (eowHold.isBefore(inputWM)) { + WindowTracing.trace( + "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for " + + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + eowHold, context.key(), context.window(), inputWM, outputWM); + return null; + } + + checkState(outputWM == null || !eowHold.isBefore(outputWM), + "End-of-window hold %s cannot be before output watermark %s", + eowHold, outputWM); + checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "End-of-window hold %s is beyond end-of-time", eowHold); + // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep + // the hold away from the combining function in elementHoldTag. + // However if !paneIsEmpty then it could make sense to use the elementHoldTag here. + // Alas, onMerge is forced to add an end of window or garbage collection hold without + // knowing whether an element hold is already in place (stopping to check is too expensive). + // This it would end up adding an element hold at the end of the window which could + // upset the elementHoldTag combining function. + context.state().access(EXTRA_HOLD_TAG).add(eowHold); + WindowTracing.trace( + "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for " + + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + eowHold, context.key(), context.window(), inputWM, outputWM); + return eowHold; + } + + /** + * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at + * which the hold was added (ie the end of window time plus allowed lateness), + * or {@literal null} if no hold was added. + * + * <p>We only add the hold if it is distinct from what would be added by + * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness} + * must be non-zero. + * + * <p>A garbage collection hold is added in two situations: + * <ol> + * <li>An incoming element came in behind the output watermark, and was too late for placing + * the usual element hold or an end of window hold. Place the garbage collection hold so that + * we can guarantee when the pane is finally triggered its output will not be dropped due to + * excessive lateness by any downstream computation. + * <li>The {@link WindowingStrategy#getClosingBehavior()} is + * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted + * for all windows which saw at least one element. Again, the garbage collection hold guarantees + * that any empty final pane can be given a timestamp which will not be considered beyond + * allowed lateness by any downstream computation. + * </ol> + * + * <p>We use {@code paneIsEmpty} to distinguish cases 1 and 2. + */ + @Nullable + private Instant addGarbageCollectionHold( + ReduceFn<?, ?, ?, W>.Context context, boolean paneIsEmpty) { + Instant outputWM = timerInternals.currentOutputWatermarkTime(); + Instant inputWM = timerInternals.currentInputWatermarkTime(); + Instant eow = context.window().maxTimestamp(); + Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness()); + + if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) { + WindowTracing.trace( + "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " + + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; " + + "outputWatermark:{}", + gcHold, context.key(), context.window(), inputWM, outputWM); + return null; + } + + if (paneIsEmpty && context.windowingStrategy().getClosingBehavior() + == ClosingBehavior.FIRE_IF_NON_EMPTY) { + WindowTracing.trace( + "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " + + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; " + + "outputWatermark:{}", + gcHold, context.key(), context.window(), inputWM, outputWM); + return null; + } + + checkState(!gcHold.isBefore(inputWM), + "Garbage collection hold %s cannot be before input watermark %s", + gcHold, inputWM); + checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), + "Garbage collection hold %s is beyond end-of-time", gcHold); + // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above. + context.state().access(EXTRA_HOLD_TAG).add(gcHold); + + WindowTracing.trace( + "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for " + + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + gcHold, context.key(), context.window(), inputWM, outputWM); + return gcHold; + } + + /** + * Prefetch watermark holds in preparation for merging. + */ + public void prefetchOnMerge(MergingStateAccessor<?, W> state) { + StateMerging.prefetchWatermarks(state, elementHoldTag); + } + + /** + * Updates the watermark hold when windows merge if it is possible the merged value does + * not equal all of the existing holds. For example, if the new window implies a later + * watermark hold, then earlier holds may be released. + */ + public void onMerge(ReduceFn<?, ?, ?, W>.OnMergeContext context) { + WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; " + + "outputWatermark:{}", + context.key(), context.window(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window()); + // If we had a cheap way to determine if we have an element hold then we could + // avoid adding an unnecessary end-of-window or garbage collection hold. + // Simply reading the above merged watermark would impose an additional read for the + // common case that the active window has just one underlying state address window and + // the hold depends on the min of the element timestamps. + // At least one merged window must be non-empty for the merge to have been triggered. + StateMerging.clear(context.state(), EXTRA_HOLD_TAG); + addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/); + } + + /** + * Result of {@link #extractAndRelease}. + */ + public static class OldAndNewHolds { + public final Instant oldHold; + @Nullable + public final Instant newHold; + + public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { + this.oldHold = oldHold; + this.newHold = newHold; + } + } + + /** + * Return (a future for) the earliest hold for {@code context}. Clear all the holds after + * reading, but add/restore an end-of-window or garbage collection hold if required. + * + * <p>The returned timestamp is the output timestamp according to the {@link OutputTimeFn} + * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late + * elements in the current pane. If there is no such value the timestamp is the end + * of the window. + */ + public ReadableState<OldAndNewHolds> extractAndRelease( + final ReduceFn<?, ?, ?, W>.Context context, final boolean isFinished) { + WindowTracing.debug( + "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; " + + "outputWatermark:{}", + context.key(), context.window(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + final WatermarkHoldState<W> elementHoldState = context.state().access(elementHoldTag); + final WatermarkHoldState<BoundedWindow> extraHoldState = context.state().access(EXTRA_HOLD_TAG); + return new ReadableState<OldAndNewHolds>() { + @Override + @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", + justification = "") + public ReadableState<OldAndNewHolds> readLater() { + elementHoldState.readLater(); + extraHoldState.readLater(); + return this; + } + + @Override + public OldAndNewHolds read() { + // Read both the element and extra holds. + Instant elementHold = elementHoldState.read(); + Instant extraHold = extraHoldState.read(); + Instant oldHold; + // Find the minimum, accounting for null. + if (elementHold == null) { + oldHold = extraHold; + } else if (extraHold == null) { + oldHold = elementHold; + } else if (elementHold.isBefore(extraHold)) { + oldHold = elementHold; + } else { + oldHold = extraHold; + } + if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { + // If no hold (eg because all elements came in behind the output watermark), or + // the hold was for garbage collection, take the end of window as the result. + WindowTracing.debug( + "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " + + "for key:{}; window:{}", + oldHold, context.key(), context.window()); + oldHold = context.window().maxTimestamp(); + } + WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", + context.key(), context.window()); + + // Clear the underlying state to allow the output watermark to progress. + elementHoldState.clear(); + extraHoldState.clear(); + + @Nullable Instant newHold = null; + if (!isFinished) { + // Only need to leave behind an end-of-window or garbage collection hold + // if future elements will be processed. + newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/); + } + + return new OldAndNewHolds(oldHold, newHold); + } + }; + } + + /** + * Clear any remaining holds. + */ + public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) { + WindowTracing.debug( + "WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + context.key(), context.window(), timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + context.state().access(elementHoldTag).clear(); + context.state().access(EXTRA_HOLD_TAG).clear(); + } + + /** + * Return the current data hold, or null if none. Does not clear. For debugging only. + */ + @Nullable + public Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) { + return context.state().access(elementHoldTag).read(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java deleted file mode 100644 index af28052..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindows.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.PCollection; - -/** - * {@link PTransform} that uses privileged (non-user-facing) APIs to assign elements of a - * {@link PCollection} to windows according to the provided {@link WindowFn}. - * - * @param <T> Type of elements being windowed - * @param <W> Window type - */ -public class AssignWindows<T, W extends BoundedWindow> - extends PTransform<PCollection<T>, PCollection<T>> { - - private WindowFn<? super T, W> fn; - - public AssignWindows(WindowFn<? super T, W> fn) { - this.fn = fn; - } - - @Override - public PCollection<T> apply(PCollection<T> input) { - return input.apply("AssignWindows", ParDo.of(new AssignWindowsDoFn<>(fn))); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java deleted file mode 100644 index 7e26253..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/AssignWindowsDoFn.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.joda.time.Instant; - -/** - * {@link OldDoFn} that tags elements of a {@link PCollection} with windows, according to the - * provided {@link WindowFn}. - * - * @param <T> Type of elements being windowed - * @param <W> Window type - */ -@SystemDoFnInternal -public class AssignWindowsDoFn<T, W extends BoundedWindow> extends OldDoFn<T, T> - implements RequiresWindowAccess { - private WindowFn<? super T, W> fn; - - public AssignWindowsDoFn(WindowFn<? super T, W> fn) { - this.fn = - checkNotNull( - fn, - "%s provided to %s cannot be null", - WindowFn.class.getSimpleName(), - AssignWindowsDoFn.class.getSimpleName()); - } - - @Override - @SuppressWarnings("unchecked") - public void processElement(final ProcessContext c) throws Exception { - Collection<W> windows = - ((WindowFn<T, W>) fn).assignWindows( - ((WindowFn<T, W>) fn).new AssignContext() { - @Override - public T element() { - return c.element(); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(c.windowingInternals().windows()); - } - }); - - c.windowingInternals() - .outputWindowedValue(c.element(), c.timestamp(), windows, PaneInfo.NO_FIRING); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java deleted file mode 100644 index f3e84a6..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/BatchTimerInternals.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.MoreObjects; -import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.joda.time.Instant; - -/** - * TimerInternals that uses priority queues to manage the timers that are ready to fire. - */ -public class BatchTimerInternals implements TimerInternals { - /** Set of timers that are scheduled used for deduplicating timers. */ - private Set<TimerData> existingTimers = new HashSet<>(); - - // Keep these queues separate so we can advance over them separately. - private PriorityQueue<TimerData> watermarkTimers = new PriorityQueue<>(11); - private PriorityQueue<TimerData> processingTimers = new PriorityQueue<>(11); - - private Instant inputWatermarkTime; - private Instant processingTime; - - private PriorityQueue<TimerData> queue(TimeDomain domain) { - return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers; - } - - public BatchTimerInternals(Instant processingTime) { - this.processingTime = processingTime; - this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - @Override - public void setTimer(TimerData timer) { - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); - } - } - - @Override - public void deleteTimer(TimerData timer) { - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - /** - * {@inheritDoc} - * - * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing - * is already complete. - */ - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public Instant currentInputWatermarkTime() { - return inputWatermarkTime; - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - // The output watermark is always undefined in batch mode. - return null; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTimers) - .toString(); - } - - public void advanceInputWatermark(ReduceFnRunner<?, ?, ?, ?> runner, Instant newInputWatermark) - throws Exception { - checkState(!newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, - newInputWatermark); - inputWatermarkTime = newInputWatermark; - advance(runner, newInputWatermark, TimeDomain.EVENT_TIME); - } - - public void advanceProcessingTime(ReduceFnRunner<?, ?, ?, ?> runner, Instant newProcessingTime) - throws Exception { - checkState(!newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); - processingTime = newProcessingTime; - advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME); - } - - private void advance(ReduceFnRunner<?, ?, ?, ?> runner, Instant newTime, TimeDomain domain) - throws Exception { - PriorityQueue<TimerData> timers = queue(domain); - boolean shouldFire = false; - - do { - TimerData timer = timers.peek(); - // Timers fire if the new time is ahead of the timer - shouldFire = timer != null && newTime.isAfter(timer.getTimestamp()); - if (shouldFire) { - // Remove before firing, so that if the trigger adds another identical - // timer we don't remove it. - timers.remove(); - runner.onTimer(timer); - } - } while (shouldFire); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java deleted file mode 100644 index 49206d1..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunner.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext; -import org.apache.beam.sdk.values.KV; - -/** - * An wrapper interface that represents the execution of a {@link OldDoFn}. - */ -public interface DoFnRunner<InputT, OutputT> { - /** - * Prepares and calls {@link OldDoFn#startBundle}. - */ - public void startBundle(); - - /** - * Calls {@link OldDoFn#processElement} with a {@link ProcessContext} containing the current - * element. - */ - public void processElement(WindowedValue<InputT> elem); - - /** - * Calls {@link OldDoFn#finishBundle} and performs additional tasks, such as - * flushing in-memory states. - */ - public void finishBundle(); - - /** - * An internal interface for signaling that a {@link OldDoFn} requires late data dropping. - */ - public interface ReduceFnExecutor<K, InputT, OutputT, W> { - /** - * Gets this object as a {@link OldDoFn}. - * - * Most implementors of this interface are expected to be {@link OldDoFn} instances, and will - * return themselves. - */ - OldDoFn<KeyedWorkItem<K, InputT>, KV<K, OutputT>> asDoFn(); - - /** - * Returns an aggregator that tracks elements that are dropped due to being late. - */ - Aggregator<Long, Long> getDroppedDueToLatenessAggregator(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java deleted file mode 100644 index f0cfd74..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java +++ /dev/null @@ -1,551 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.DoFnRunners.OutputManager; -import org.apache.beam.sdk.util.ExecutionContext.StepContext; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; -import org.joda.time.format.PeriodFormat; - -/** - * A base implementation of {@link DoFnRunner}. - * - * <p> Sub-classes should override {@link #invokeProcessElement}. - */ -public abstract class DoFnRunnerBase<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - - /** The {@link OldDoFn} being run. */ - public final OldDoFn<InputT, OutputT> fn; - - /** The context used for running the {@link OldDoFn}. */ - public final DoFnContext<InputT, OutputT> context; - - protected DoFnRunnerBase( - PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - this.fn = fn; - this.context = new DoFnContext<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy == null ? null : windowingStrategy.getWindowFn()); - } - - /** - * An implementation of {@code OutputManager} using simple lists, for testing and in-memory - * contexts such as the {@code DirectRunner}. - */ - public static class ListOutputManager implements OutputManager { - - private Map<TupleTag<?>, List<WindowedValue<?>>> outputLists = Maps.newHashMap(); - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - @SuppressWarnings({"rawtypes", "unchecked"}) - List<WindowedValue<T>> outputList = (List) outputLists.get(tag); - - if (outputList == null) { - outputList = Lists.newArrayList(); - @SuppressWarnings({"rawtypes", "unchecked"}) - List<WindowedValue<?>> untypedList = (List) outputList; - outputLists.put(tag, untypedList); - } - - outputList.add(output); - } - - public <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) { - // Safe cast by design, inexpressible in Java without rawtypes - @SuppressWarnings({"rawtypes", "unchecked"}) - List<WindowedValue<T>> outputList = (List) outputLists.get(tag); - return (outputList != null) ? outputList : Collections.<WindowedValue<T>>emptyList(); - } - } - - @Override - public void startBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.startBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - @Override - public void processElement(WindowedValue<InputT> elem) { - if (elem.getWindows().size() <= 1 - || (!RequiresWindowAccess.class.isAssignableFrom(fn.getClass()) - && context.sideInputReader.isEmpty())) { - invokeProcessElement(elem); - } else { - // We could modify the windowed value (and the processContext) to - // avoid repeated allocations, but this is more straightforward. - for (WindowedValue<InputT> windowedValue : elem.explodeWindows()) { - invokeProcessElement(windowedValue); - } - } - } - - /** - * Invokes {@link OldDoFn#processElement} after certain pre-processings has been done in - * {@link DoFnRunnerBase#processElement}. - */ - protected abstract void invokeProcessElement(WindowedValue<InputT> elem); - - @Override - public void finishBundle() { - // This can contain user code. Wrap it in case it throws an exception. - try { - fn.finishBundle(context); - } catch (Throwable t) { - // Exception in user code. - throw wrapUserCodeException(t); - } - } - - /** - * A concrete implementation of {@code OldDoFn.Context} used for running a {@link OldDoFn}. - * - * @param <InputT> the type of the {@link OldDoFn} (main) input elements - * @param <OutputT> the type of the {@link OldDoFn} (main) output elements - */ - private static class DoFnContext<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.Context { - private static final int MAX_SIDE_OUTPUTS = 1000; - - final PipelineOptions options; - final OldDoFn<InputT, OutputT> fn; - final SideInputReader sideInputReader; - final OutputManager outputManager; - final TupleTag<OutputT> mainOutputTag; - final StepContext stepContext; - final AggregatorFactory aggregatorFactory; - final WindowFn<?, ?> windowFn; - - /** - * The set of known output tags, some of which may be undeclared, so we can throw an - * exception when it exceeds {@link #MAX_SIDE_OUTPUTS}. - */ - private Set<TupleTag<?>> outputTags; - - public DoFnContext(PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowFn<?, ?> windowFn) { - fn.super(); - this.options = options; - this.fn = fn; - this.sideInputReader = sideInputReader; - this.outputManager = outputManager; - this.mainOutputTag = mainOutputTag; - this.outputTags = Sets.newHashSet(); - - outputTags.add(mainOutputTag); - for (TupleTag<?> sideOutputTag : sideOutputTags) { - outputTags.add(sideOutputTag); - } - - this.stepContext = stepContext; - this.aggregatorFactory = aggregatorFactory; - this.windowFn = windowFn; - super.setupDelegateAggregators(); - } - - ////////////////////////////////////////////////////////////////////////////// - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - <T, W extends BoundedWindow> WindowedValue<T> makeWindowedValue( - T output, Instant timestamp, Collection<W> windows, PaneInfo pane) { - final Instant inputTimestamp = timestamp; - - if (timestamp == null) { - timestamp = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - if (windows == null) { - try { - // The windowFn can never succeed at accessing the element, so its type does not - // matter here - @SuppressWarnings("unchecked") - WindowFn<Object, W> objectWindowFn = (WindowFn<Object, W>) windowFn; - windows = objectWindowFn.assignWindows(objectWindowFn.new AssignContext() { - @Override - public Object element() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input element when none was available"); - } - - @Override - public Instant timestamp() { - if (inputTimestamp == null) { - throw new UnsupportedOperationException( - "WindowFn attempted to access input timestamp when none was available"); - } - return inputTimestamp; - } - - @Override - public W window() { - throw new UnsupportedOperationException( - "WindowFn attempted to access input windows when none were available"); - } - }); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - } - - return WindowedValue.of(output, timestamp, windows, pane); - } - - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - if (!sideInputReader.contains(view)) { - throw new IllegalArgumentException("calling sideInput() with unknown view"); - } - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); - return sideInputReader.get(view, sideInputWindow); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWindowedValue(makeWindowedValue(output, timestamp, windows, pane)); - } - - void outputWindowedValue(WindowedValue<OutputT> windowedElem) { - outputManager.output(mainOutputTag, windowedElem); - if (stepContext != null) { - stepContext.noteOutput(windowedElem); - } - } - - protected <T> void sideOutputWindowedValue(TupleTag<T> tag, - T output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - sideOutputWindowedValue(tag, makeWindowedValue(output, timestamp, windows, pane)); - } - - protected <T> void sideOutputWindowedValue(TupleTag<T> tag, WindowedValue<T> windowedElem) { - if (!outputTags.contains(tag)) { - // This tag wasn't declared nor was it seen before during this execution. - // Thus, this must be a new, undeclared and unconsumed output. - // To prevent likely user errors, enforce the limit on the number of side - // outputs. - if (outputTags.size() >= MAX_SIDE_OUTPUTS) { - throw new IllegalArgumentException( - "the number of side outputs has exceeded a limit of " + MAX_SIDE_OUTPUTS); - } - outputTags.add(tag); - } - - outputManager.output(tag, windowedElem); - if (stepContext != null) { - stepContext.noteSideOutput(tag, windowedElem); - } - } - - // Following implementations of output, outputWithTimestamp, and sideOutput - // are only accessible in OldDoFn.startBundle and OldDoFn.finishBundle, and will be shadowed by - // ProcessContext's versions in OldDoFn.processElement. - @Override - public void output(OutputT output) { - outputWindowedValue(output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - outputWindowedValue(output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - checkNotNull(tag, "TupleTag passed to sideOutput cannot be null"); - sideOutputWindowedValue(tag, output, null, null, PaneInfo.NO_FIRING); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "TupleTag passed to sideOutputWithTimestamp cannot be null"); - sideOutputWindowedValue(tag, output, timestamp, null, PaneInfo.NO_FIRING); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal( - String name, CombineFn<AggInputT, ?, AggOutputT> combiner) { - checkNotNull(combiner, "Combiner passed to createAggregatorInternal cannot be null"); - return aggregatorFactory.createAggregatorForDoFn(fn.getClass(), stepContext, name, combiner); - } - } - - /** - * Returns a new {@link OldDoFn.ProcessContext} for the given element. - */ - protected OldDoFn<InputT, OutputT>.ProcessContext createProcessContext( - WindowedValue<InputT> elem) { - return new DoFnProcessContext<InputT, OutputT>(fn, context, elem); - } - - protected RuntimeException wrapUserCodeException(Throwable t) { - throw UserCodeException.wrapIf(!isSystemDoFn(), t); - } - - private boolean isSystemDoFn() { - return fn.getClass().isAnnotationPresent(SystemDoFnInternal.class); - } - - /** - * A concrete implementation of {@link OldDoFn.ProcessContext} used for - * running a {@link OldDoFn} over a single element. - * - * @param <InputT> the type of the {@link OldDoFn} (main) input elements - * @param <OutputT> the type of the {@link OldDoFn} (main) output elements - */ - static class DoFnProcessContext<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.ProcessContext { - - - final OldDoFn<InputT, OutputT> fn; - final DoFnContext<InputT, OutputT> context; - final WindowedValue<InputT> windowedValue; - - public DoFnProcessContext(OldDoFn<InputT, OutputT> fn, - DoFnContext<InputT, OutputT> context, - WindowedValue<InputT> windowedValue) { - fn.super(); - this.fn = fn; - this.context = context; - this.windowedValue = windowedValue; - } - - @Override - public PipelineOptions getPipelineOptions() { - return context.getPipelineOptions(); - } - - @Override - public InputT element() { - return windowedValue.getValue(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - checkNotNull(view, "View passed to sideInput cannot be null"); - Iterator<? extends BoundedWindow> windowIter = windows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - if (context.windowFn instanceof GlobalWindows) { - // TODO: Remove this once GroupByKeyOnly no longer outputs elements - // without windows - window = GlobalWindow.INSTANCE; - } else { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - return context.sideInput(view, window); - } - - @Override - public BoundedWindow window() { - if (!(fn instanceof RequiresWindowAccess)) { - throw new UnsupportedOperationException( - "window() is only available in the context of a OldDoFn marked as" - + "RequiresWindowAccess."); - } - return Iterables.getOnlyElement(windows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public void output(OutputT output) { - context.outputWindowedValue(windowedValue.withValue(output)); - } - - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(timestamp); - context.outputWindowedValue(output, timestamp, - windowedValue.getWindows(), windowedValue.getPane()); - } - - void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - checkNotNull(tag, "Tag passed to sideOutput cannot be null"); - context.sideOutputWindowedValue(tag, windowedValue.withValue(output)); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - checkNotNull(tag, "Tag passed to sideOutputWithTimestamp cannot be null"); - checkTimestamp(timestamp); - context.sideOutputWindowedValue( - tag, output, timestamp, windowedValue.getWindows(), windowedValue.getPane()); - } - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - private void checkTimestamp(Instant timestamp) { - if (timestamp.isBefore(windowedValue.getTimestamp().minus(fn.getAllowedTimestampSkew()))) { - throw new IllegalArgumentException(String.format( - "Cannot output with timestamp %s. Output timestamps must be no earlier than the " - + "timestamp of the current input (%s) minus the allowed skew (%s). See the " - + "OldDoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.", - timestamp, windowedValue.getTimestamp(), - PeriodFormat.getDefault().print(fn.getAllowedTimestampSkew().toPeriod()))); - } - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - return new WindowingInternals<InputT, OutputT>() { - @Override - public void outputWindowedValue(OutputT output, Instant timestamp, - Collection<? extends BoundedWindow> windows, PaneInfo pane) { - context.outputWindowedValue(output, timestamp, windows, pane); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public TimerInternals timerInternals() { - return context.stepContext.timerInternals(); - } - - @Override - public <T> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<T> elemCoder) throws IOException { - @SuppressWarnings("unchecked") - Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) context.windowFn.windowCoder(); - - context.stepContext.writePCollectionViewData( - tag, data, IterableCoder.of(WindowedValue.getFullCoder(elemCoder, windowCoder)), - window(), windowCoder); - } - - @Override - public StateInternals<?> stateInternals() { - return context.stepContext.stateInternals(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - return context.sideInput(view, mainInputWindow); - } - }; - } - - @Override - protected <AggregatorInputT, AggregatorOutputT> Aggregator<AggregatorInputT, AggregatorOutputT> - createAggregatorInternal( - String name, CombineFn<AggregatorInputT, ?, AggregatorOutputT> combiner) { - return context.createAggregatorInternal(name, combiner); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java deleted file mode 100644 index c4df7b2..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/DoFnRunners.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.util.List; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.DoFnRunner.ReduceFnExecutor; -import org.apache.beam.sdk.util.ExecutionContext.StepContext; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TupleTag; - -/** - * Static utility methods that provide {@link DoFnRunner} implementations. - */ -public class DoFnRunners { - /** - * Information about how to create output receivers and output to them. - */ - public interface OutputManager { - /** - * Outputs a single element to the receiver indicated by the given {@link TupleTag}. - */ - public <T> void output(TupleTag<T> tag, WindowedValue<T> output); - } - - /** - * Returns a basic implementation of {@link DoFnRunner} that works for most {@link OldDoFn DoFns}. - * - * <p>It invokes {@link OldDoFn#processElement} for each input. - */ - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> simpleRunner( - PipelineOptions options, - OldDoFn<InputT, OutputT> fn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - return new SimpleDoFnRunner<>( - options, - fn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } - - /** - * Returns an implementation of {@link DoFnRunner} that handles late data dropping. - * - * <p>It drops elements from expired windows before they reach the underlying {@link OldDoFn}. - */ - public static <K, InputT, OutputT, W extends BoundedWindow> - DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> lateDataDroppingRunner( - PipelineOptions options, - ReduceFnExecutor<K, InputT, OutputT, W> reduceFnExecutor, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<KV<K, OutputT>> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, W> windowingStrategy) { - DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> simpleDoFnRunner = - simpleRunner( - options, - reduceFnExecutor.asDoFn(), - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - return new LateDataDroppingDoFnRunner<>( - simpleDoFnRunner, - windowingStrategy, - stepContext.timerInternals(), - reduceFnExecutor.getDroppedDueToLatenessAggregator()); - } - - - public static <InputT, OutputT> DoFnRunner<InputT, OutputT> createDefault( - PipelineOptions options, - OldDoFn<InputT, OutputT> doFn, - SideInputReader sideInputReader, - OutputManager outputManager, - TupleTag<OutputT> mainOutputTag, - List<TupleTag<?>> sideOutputTags, - StepContext stepContext, - AggregatorFactory aggregatorFactory, - WindowingStrategy<?, ?> windowingStrategy) { - if (doFn instanceof ReduceFnExecutor) { - @SuppressWarnings("rawtypes") - ReduceFnExecutor fn = (ReduceFnExecutor) doFn; - @SuppressWarnings({"unchecked", "cast", "rawtypes"}) - DoFnRunner<InputT, OutputT> runner = (DoFnRunner<InputT, OutputT>) lateDataDroppingRunner( - options, - fn, - sideInputReader, - outputManager, - (TupleTag) mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - (WindowingStrategy) windowingStrategy); - return runner; - } - return simpleRunner( - options, - doFn, - sideInputReader, - outputManager, - mainOutputTag, - sideOutputTags, - stepContext, - aggregatorFactory, - windowingStrategy); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java deleted file mode 100644 index f386dfb..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsDoFn.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.state.StateInternalsFactory; -import org.apache.beam.sdk.values.KV; - -/** - * {@link OldDoFn} that merges windows and groups elements in those windows, optionally - * combining values. - * - * @param <K> key type - * @param <InputT> input value element type - * @param <OutputT> output value element type - * @param <W> window type - */ -@SystemDoFnInternal -public abstract class GroupAlsoByWindowsDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>> { - public static final String DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER = "DroppedDueToClosedWindow"; - public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "DroppedDueToLateness"; - - protected final Aggregator<Long, Long> droppedDueToClosedWindow = - createAggregator(DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, new Sum.SumLongFn()); - protected final Aggregator<Long, Long> droppedDueToLateness = - createAggregator(DROPPED_DUE_TO_LATENESS_COUNTER, new Sum.SumLongFn()); - - /** - * Create the default {@link GroupAlsoByWindowsDoFn}, which uses window sets to implement the - * grouping. - * - * @param windowingStrategy The window function and trigger to use for grouping - * @param inputCoder the input coder to use - */ - public static <K, V, W extends BoundedWindow> - GroupAlsoByWindowsDoFn<K, V, Iterable<V>, W> createDefault( - WindowingStrategy<?, W> windowingStrategy, - StateInternalsFactory<K> stateInternalsFactory, - Coder<V> inputCoder) { - return new GroupAlsoByWindowsViaOutputBufferDoFn<>( - windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, W>buffering(inputCoder)); - } -}