http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java deleted file mode 100644 index cc418da..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.collect.Iterables; -import java.util.List; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; -import org.apache.beam.sdk.values.KV; -import org.joda.time.Instant; - -/** - * The default batch {@link GroupAlsoByWindowsDoFn} implementation, if no specialized "fast path" - * implementation is applicable. - */ -@SystemDoFnInternal -public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends BoundedWindow> - extends GroupAlsoByWindowsDoFn<K, InputT, OutputT, W> { - - private final WindowingStrategy<?, W> strategy; - private final StateInternalsFactory<K> stateInternalsFactory; - private SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn; - - public GroupAlsoByWindowsViaOutputBufferDoFn( - WindowingStrategy<?, W> windowingStrategy, - StateInternalsFactory<K> stateInternalsFactory, - SystemReduceFn<K, InputT, ?, OutputT, W> reduceFn) { - this.strategy = windowingStrategy; - this.reduceFn = reduceFn; - this.stateInternalsFactory = stateInternalsFactory; - } - - @Override - public void processElement( - OldDoFn<KV<K, Iterable<WindowedValue<InputT>>>, KV<K, OutputT>>.ProcessContext c) - throws Exception { - K key = c.element().getKey(); - // Used with Batch, we know that all the data is available for this key. We can't use the - // timer manager from the context because it doesn't exist. So we create one and emulate the - // watermark, knowing that we have all data and it is in timestamp order. - BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now()); - StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key); - - ReduceFnRunner<K, InputT, OutputT, W> reduceFnRunner = - new ReduceFnRunner<K, InputT, OutputT, W>( - key, - strategy, - stateInternals, - timerInternals, - c.windowingInternals(), - droppedDueToClosedWindow, - reduceFn, - c.getPipelineOptions()); - - Iterable<List<WindowedValue<InputT>>> chunks = - Iterables.partition(c.element().getValue(), 1000); - for (Iterable<WindowedValue<InputT>> chunk : chunks) { - // Process the chunk of elements. - reduceFnRunner.processElements(chunk); - - // Then, since elements are sorted by their timestamp, advance the input watermark - // to the first element, and fire any timers that may have been scheduled. - timerInternals.advanceInputWatermark(reduceFnRunner, chunk.iterator().next().getTimestamp()); - - // Fire any processing timers that need to fire - timerInternals.advanceProcessingTime(reduceFnRunner, Instant.now()); - - // Leave the output watermark undefined. Since there's no late data in batch mode - // there's really no need to track it as we do for streaming. - } - - // Finish any pending windows by advancing the input watermark to infinity. - timerInternals.advanceInputWatermark(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE); - - // Finally, advance the processing time to infinity to fire any timers. - timerInternals.advanceProcessingTime(reduceFnRunner, BoundedWindow.TIMESTAMP_MAX_VALUE); - - reduceFnRunner.persist(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java deleted file mode 100644 index fdad17a..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/GroupByKeyViaGroupByKeyOnly.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; -import org.apache.beam.sdk.util.state.StateInternalsFactory; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * An implementation of {@link GroupByKey} built on top of a lower-level {@link GroupByKeyOnly} - * primitive. - * - * <p>This implementation of {@link GroupByKey} proceeds via the following steps: - * <ol> - * <li>{@code ReifyTimestampsAndWindowsDoFn ParDo(ReifyTimestampsAndWindows)}: This embeds - * the previously-implicit timestamp and window into the elements themselves, so a - * window-and-timestamp-unaware transform can operate on them.</li> - * <li>{@code GroupByKeyOnly}: This lower-level primitive groups by keys, ignoring windows - * and timestamps. Many window-unaware runners have such a primitive already.</li> - * <li>{@code SortValuesByTimestamp ParDo(SortValuesByTimestamp)}: The values in the iterables - * output by {@link GroupByKeyOnly} are sorted by timestamp.</li> - * <li>{@code GroupAlsoByWindow}: This primitive processes the sorted values. Today it is - * implemented as a {@link ParDo} that calls reserved internal methods.</li> - * </ol> - * - * <p>This implementation of {@link GroupByKey} has severe limitations unless its component - * transforms are replaced. As-is, it is only applicable for in-memory runners using a batch-style - * execution strategy. Specifically: - * - * <ul> - * <li>Every iterable output by {@link GroupByKeyOnly} must contain all elements for that key. - * A streaming-style partition, with multiple elements for the same key, will not yield - * correct results.</li> - * <li>Sorting of values by timestamp is performed on an in-memory list. It will not succeed - * for large iterables.</li> - * <li>The implementation of {@code GroupAlsoByWindow} does not support timers. This is only - * appropriate for runners which also do not support timers.</li> - * </ul> - */ -public class GroupByKeyViaGroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { - - private final GroupByKey<K, V> gbkTransform; - - public GroupByKeyViaGroupByKeyOnly(GroupByKey<K, V> originalTransform) { - this.gbkTransform = originalTransform; - } - - @Override - public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy(); - - return input - // Make each input element's timestamp and assigned windows - // explicit, in the value part. - .apply(new ReifyTimestampsAndWindows<K, V>()) - - // Group by just the key. - // Combiner lifting will not happen regardless of the disallowCombinerLifting value. - // There will be no combiners right after the GroupByKeyOnly because of the two ParDos - // introduced in here. - .apply(new GroupByKeyOnly<K, WindowedValue<V>>()) - - // Sort each key's values by timestamp. GroupAlsoByWindow requires - // its input to be sorted by timestamp. - .apply(new SortValuesByTimestamp<K, V>()) - - // Group each key's values by window, merging windows as needed. - .apply(new GroupAlsoByWindow<K, V>(windowingStrategy)) - - // And update the windowing strategy as appropriate. - .setWindowingStrategyInternal( - gbkTransform.updateWindowingStrategy(windowingStrategy)); - } - - /** - * Runner-specific primitive that groups by key only, ignoring any window assignments. A - * runner that uses {@link GroupByKeyViaGroupByKeyOnly} should have a primitive way to translate - * or evaluate this class. - */ - public static class GroupByKeyOnly<K, V> - extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> { - - @SuppressWarnings({"rawtypes", "unchecked"}) - @Override - public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { - return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( - input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - } - - @Override - public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) { - return GroupByKey.getOutputKvCoder(input.getCoder()); - } - } - - /** - * Helper transform that sorts the values associated with each key by timestamp. - */ - private static class SortValuesByTimestamp<K, V> - extends PTransform< - PCollection<KV<K, Iterable<WindowedValue<V>>>>, - PCollection<KV<K, Iterable<WindowedValue<V>>>>> { - @Override - public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply( - PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { - return input - .apply( - ParDo.of( - new OldDoFn< - KV<K, Iterable<WindowedValue<V>>>, - KV<K, Iterable<WindowedValue<V>>>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, Iterable<WindowedValue<V>>> kvs = c.element(); - K key = kvs.getKey(); - Iterable<WindowedValue<V>> unsortedValues = kvs.getValue(); - List<WindowedValue<V>> sortedValues = new ArrayList<>(); - for (WindowedValue<V> value : unsortedValues) { - sortedValues.add(value); - } - Collections.sort( - sortedValues, - new Comparator<WindowedValue<V>>() { - @Override - public int compare(WindowedValue<V> e1, WindowedValue<V> e2) { - return e1.getTimestamp().compareTo(e2.getTimestamp()); - } - }); - c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues)); - } - })) - .setCoder(input.getCoder()); - } - } - - /** - * Runner-specific primitive that takes a collection of timestamp-ordered values associated with - * each key, groups the values by window, merges windows as needed, and for each window in each - * key, outputs a collection of key/value-list pairs implicitly assigned to the window and with - * the timestamp derived from that window. - */ - public static class GroupAlsoByWindow<K, V> - extends PTransform< - PCollection<KV<K, Iterable<WindowedValue<V>>>>, PCollection<KV<K, Iterable<V>>>> { - private final WindowingStrategy<?, ?> windowingStrategy; - - public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) { - this.windowingStrategy = windowingStrategy; - } - - public WindowingStrategy<?, ?> getWindowingStrategy() { - return windowingStrategy; - } - - private KvCoder<K, Iterable<WindowedValue<V>>> getKvCoder( - Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) { - // Coder<KV<...>> --> KvCoder<...> - checkArgument(inputCoder instanceof KvCoder, - "%s requires a %s<...> but got %s", - getClass().getSimpleName(), - KvCoder.class.getSimpleName(), - inputCoder); - @SuppressWarnings("unchecked") - KvCoder<K, Iterable<WindowedValue<V>>> kvCoder = - (KvCoder<K, Iterable<WindowedValue<V>>>) inputCoder; - return kvCoder; - } - - public Coder<K> getKeyCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) { - return getKvCoder(inputCoder).getKeyCoder(); - } - - public Coder<V> getValueCoder(Coder<KV<K, Iterable<WindowedValue<V>>>> inputCoder) { - // Coder<Iterable<...>> --> IterableCoder<...> - Coder<Iterable<WindowedValue<V>>> iterableWindowedValueCoder = - getKvCoder(inputCoder).getValueCoder(); - checkArgument(iterableWindowedValueCoder instanceof IterableCoder, - "%s requires a %s<..., %s> but got a %s", - getClass().getSimpleName(), - KvCoder.class.getSimpleName(), - IterableCoder.class.getSimpleName(), - iterableWindowedValueCoder); - IterableCoder<WindowedValue<V>> iterableCoder = - (IterableCoder<WindowedValue<V>>) iterableWindowedValueCoder; - - // Coder<WindowedValue<...>> --> WindowedValueCoder<...> - Coder<WindowedValue<V>> iterableElementCoder = iterableCoder.getElemCoder(); - checkArgument(iterableElementCoder instanceof WindowedValueCoder, - "%s requires a %s<..., %s<%s>> but got a %s", - getClass().getSimpleName(), - KvCoder.class.getSimpleName(), - IterableCoder.class.getSimpleName(), - WindowedValueCoder.class.getSimpleName(), - iterableElementCoder); - WindowedValueCoder<V> windowedValueCoder = - (WindowedValueCoder<V>) iterableElementCoder; - - return windowedValueCoder.getValueCoder(); - } - - @Override - public PCollection<KV<K, Iterable<V>>> apply( - PCollection<KV<K, Iterable<WindowedValue<V>>>> input) { - @SuppressWarnings("unchecked") - KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder = - (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder(); - - Coder<K> keyCoder = inputKvCoder.getKeyCoder(); - Coder<Iterable<WindowedValue<V>>> inputValueCoder = inputKvCoder.getValueCoder(); - - IterableCoder<WindowedValue<V>> inputIterableValueCoder = - (IterableCoder<WindowedValue<V>>) inputValueCoder; - Coder<WindowedValue<V>> inputIterableElementCoder = inputIterableValueCoder.getElemCoder(); - WindowedValueCoder<V> inputIterableWindowedValueCoder = - (WindowedValueCoder<V>) inputIterableElementCoder; - - Coder<V> inputIterableElementValueCoder = inputIterableWindowedValueCoder.getValueCoder(); - Coder<Iterable<V>> outputValueCoder = IterableCoder.of(inputIterableElementValueCoder); - Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder); - - return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal( - input.getPipeline(), windowingStrategy, input.isBounded()) - .setCoder(outputKvCoder); - } - - private <W extends BoundedWindow> - GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W> groupAlsoByWindowsFn( - WindowingStrategy<?, W> strategy, - StateInternalsFactory<K> stateInternalsFactory, - Coder<V> inputIterableElementValueCoder) { - return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>( - strategy, - stateInternalsFactory, - SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java deleted file mode 100644 index 08c670e..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunner.java +++ /dev/null @@ -1,145 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.KV; -import org.joda.time.Instant; - -/** - * A customized {@link DoFnRunner} that handles late data dropping for - * a {@link KeyedWorkItem} input {@link OldDoFn}. - * - * <p>It expands windows before checking data lateness. - * - * <p>{@link KeyedWorkItem KeyedWorkItems} are always in empty windows. - * - * @param <K> key type - * @param <InputT> input value element type - * @param <OutputT> output value element type - * @param <W> window type - */ -public class LateDataDroppingDoFnRunner<K, InputT, OutputT, W extends BoundedWindow> - implements DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> { - private final DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner; - private final LateDataFilter lateDataFilter; - - public LateDataDroppingDoFnRunner( - DoFnRunner<KeyedWorkItem<K, InputT>, KV<K, OutputT>> doFnRunner, - WindowingStrategy<?, ?> windowingStrategy, - TimerInternals timerInternals, - Aggregator<Long, Long> droppedDueToLateness) { - this.doFnRunner = doFnRunner; - lateDataFilter = new LateDataFilter(windowingStrategy, timerInternals, droppedDueToLateness); - } - - @Override - public void startBundle() { - doFnRunner.startBundle(); - } - - @Override - public void processElement(WindowedValue<KeyedWorkItem<K, InputT>> elem) { - Iterable<WindowedValue<InputT>> nonLateElements = lateDataFilter.filter( - elem.getValue().key(), elem.getValue().elementsIterable()); - KeyedWorkItem<K, InputT> keyedWorkItem = KeyedWorkItems.workItem( - elem.getValue().key(), elem.getValue().timersIterable(), nonLateElements); - doFnRunner.processElement(elem.withValue(keyedWorkItem)); - } - - @Override - public void finishBundle() { - doFnRunner.finishBundle(); - } - - /** - * It filters late data in a {@link KeyedWorkItem}. - */ - @VisibleForTesting - static class LateDataFilter { - private final WindowingStrategy<?, ?> windowingStrategy; - private final TimerInternals timerInternals; - private final Aggregator<Long, Long> droppedDueToLateness; - - public LateDataFilter( - WindowingStrategy<?, ?> windowingStrategy, - TimerInternals timerInternals, - Aggregator<Long, Long> droppedDueToLateness) { - this.windowingStrategy = windowingStrategy; - this.timerInternals = timerInternals; - this.droppedDueToLateness = droppedDueToLateness; - } - - /** - * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains - * non-late input elements. - */ - public <K, InputT> Iterable<WindowedValue<InputT>> filter( - final K key, Iterable<WindowedValue<InputT>> elements) { - Iterable<Iterable<WindowedValue<InputT>>> windowsExpandedElements = Iterables.transform( - elements, - new Function<WindowedValue<InputT>, Iterable<WindowedValue<InputT>>>() { - @Override - public Iterable<WindowedValue<InputT>> apply(final WindowedValue<InputT> input) { - return Iterables.transform( - input.getWindows(), - new Function<BoundedWindow, WindowedValue<InputT>>() { - @Override - public WindowedValue<InputT> apply(BoundedWindow window) { - return WindowedValue.of( - input.getValue(), input.getTimestamp(), window, input.getPane()); - } - }); - }}); - - Iterable<WindowedValue<InputT>> nonLateElements = Iterables.filter( - Iterables.concat(windowsExpandedElements), - new Predicate<WindowedValue<InputT>>() { - @Override - public boolean apply(WindowedValue<InputT> input) { - BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); - if (canDropDueToExpiredWindow(window)) { - // The element is too late for this window. - droppedDueToLateness.addValue(1L); - WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since too far behind inputWatermark:{}; outputWatermark:{}", - input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - return false; - } else { - return true; - } - } - }); - return nonLateElements; - } - - /** Is {@code window} expired w.r.t. the garbage collection watermark? */ - private boolean canDropDueToExpiredWindow(BoundedWindow window) { - Instant inputWM = timerInternals.currentInputWatermarkTime(); - return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()).isBefore(inputWM); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java deleted file mode 100644 index e809c24..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/NonEmptyPanes.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.AccumulatorCombiningState; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateMerging; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; - -/** - * Tracks which windows have non-empty panes. Specifically, which windows have new elements since - * their last triggering. - * - * @param <W> The kind of windows being tracked. - */ -public abstract class NonEmptyPanes<K, W extends BoundedWindow> { - - static <K, W extends BoundedWindow> NonEmptyPanes<K, W> create( - WindowingStrategy<?, W> strategy, ReduceFn<K, ?, ?, W> reduceFn) { - if (strategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) { - return new DiscardingModeNonEmptyPanes<>(reduceFn); - } else { - return new GeneralNonEmptyPanes<>(); - } - } - - /** - * Record that some content has been added to the window in {@code context}, and therefore the - * current pane is not empty. - */ - public abstract void recordContent(StateAccessor<K> context); - - /** - * Record that the given pane is empty. - */ - public abstract void clearPane(StateAccessor<K> state); - - /** - * Return true if the current pane for the window in {@code context} is empty. - */ - public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context); - - /** - * Prefetch in preparation for merging. - */ - public abstract void prefetchOnMerge(MergingStateAccessor<K, W> state); - - /** - * Eagerly merge backing state. - */ - public abstract void onMerge(MergingStateAccessor<K, W> context); - - /** - * An implementation of {@code NonEmptyPanes} optimized for use with discarding mode. Uses the - * presence of data in the accumulation buffer to record non-empty panes. - */ - private static class DiscardingModeNonEmptyPanes<K, W extends BoundedWindow> - extends NonEmptyPanes<K, W> { - - private ReduceFn<K, ?, ?, W> reduceFn; - - private DiscardingModeNonEmptyPanes(ReduceFn<K, ?, ?, W> reduceFn) { - this.reduceFn = reduceFn; - } - - @Override - public ReadableState<Boolean> isEmpty(StateAccessor<K> state) { - return reduceFn.isEmpty(state); - } - - @Override - public void recordContent(StateAccessor<K> state) { - // Nothing to do -- the reduceFn is tracking contents - } - - @Override - public void clearPane(StateAccessor<K> state) { - // Nothing to do -- the reduceFn is tracking contents - } - - @Override - public void prefetchOnMerge(MergingStateAccessor<K, W> state) { - // Nothing to do -- the reduceFn is tracking contents - } - - @Override - public void onMerge(MergingStateAccessor<K, W> context) { - // Nothing to do -- the reduceFn is tracking contents - } - } - - /** - * An implementation of {@code NonEmptyPanes} for general use. - */ - private static class GeneralNonEmptyPanes<K, W extends BoundedWindow> - extends NonEmptyPanes<K, W> { - - private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> - PANE_ADDITIONS_TAG = - StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( - "count", VarLongCoder.of(), new Sum.SumLongFn())); - - @Override - public void recordContent(StateAccessor<K> state) { - state.access(PANE_ADDITIONS_TAG).add(1L); - } - - @Override - public void clearPane(StateAccessor<K> state) { - state.access(PANE_ADDITIONS_TAG).clear(); - } - - @Override - public ReadableState<Boolean> isEmpty(StateAccessor<K> state) { - return state.access(PANE_ADDITIONS_TAG).isEmpty(); - } - - @Override - public void prefetchOnMerge(MergingStateAccessor<K, W> state) { - StateMerging.prefetchCombiningValues(state, PANE_ADDITIONS_TAG); - } - - @Override - public void onMerge(MergingStateAccessor<K, W> context) { - StateMerging.mergeCombiningValues(context, PANE_ADDITIONS_TAG); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java deleted file mode 100644 index 90c10b5..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PaneInfoTracker.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.ValueState; -import org.joda.time.Instant; - -/** - * Determine the timing and other properties of a new pane for a given computation, key and window. - * Incorporates any previous pane, whether the pane has been produced because an - * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp - * and the current output watermark. - */ -public class PaneInfoTracker { - private TimerInternals timerInternals; - - public PaneInfoTracker(TimerInternals timerInternals) { - this.timerInternals = timerInternals; - } - - @VisibleForTesting - static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG = - StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE)); - - public void clear(StateAccessor<?> state) { - state.access(PANE_INFO_TAG).clear(); - } - - /** - * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane - * info includes the timing for the pane, who's calculation is quite subtle. - * - * @param isFinal should be {@code true} only if the triggering machinery can guarantee - * no further firings for the - */ - public ReadableState<PaneInfo> getNextPaneInfo( - ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) { - final Object key = context.key(); - final ReadableState<PaneInfo> previousPaneFuture = - context.state().access(PaneInfoTracker.PANE_INFO_TAG); - final Instant windowMaxTimestamp = context.window().maxTimestamp(); - - return new ReadableState<PaneInfo>() { - @Override - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public ReadableState<PaneInfo> readLater() { - previousPaneFuture.readLater(); - return this; - } - - @Override - public PaneInfo read() { - PaneInfo previousPane = previousPaneFuture.read(); - return describePane(key, windowMaxTimestamp, previousPane, isFinal); - } - }; - } - - public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) { - context.state().access(PANE_INFO_TAG).write(currentPane); - } - - private <W> PaneInfo describePane( - Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) { - boolean isFirst = previousPane == null; - Timing previousTiming = isFirst ? null : previousPane.getTiming(); - long index = isFirst ? 0 : previousPane.getIndex() + 1; - long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1; - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - - // True if it is not possible to assign the element representing this pane a timestamp - // which will make an ON_TIME pane for any following computation. - // Ie true if the element's latest possible timestamp is before the current output watermark. - boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM); - - // True if all emitted panes (if any) were EARLY panes. - // Once the ON_TIME pane has fired, all following panes must be considered LATE even - // if the output watermark is behind the end of the window. - boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY; - - // True is the input watermark hasn't passed the window's max timestamp. - boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp); - - Timing timing; - if (isLateForOutput || !onlyEarlyPanesSoFar) { - // The output watermark has already passed the end of this window, or we have already - // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must - // consider this pane LATE. - timing = Timing.LATE; - } else if (isEarlyForInput) { - // This is an EARLY firing. - timing = Timing.EARLY; - nonSpeculativeIndex = -1; - } else { - // This is the unique ON_TIME firing for the window. - timing = Timing.ON_TIME; - } - - WindowTracing.debug( - "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; " - + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}", - timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput); - - if (previousPane != null) { - // Timing transitions should follow EARLY* ON_TIME? LATE* - switch (previousTiming) { - case EARLY: - checkState( - timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE, - "EARLY cannot transition to %s", timing); - break; - case ON_TIME: - checkState( - timing == Timing.LATE, "ON_TIME cannot transition to %s", timing); - break; - case LATE: - checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing); - break; - case UNKNOWN: - break; - } - checkState(!previousPane.isLast(), "Last pane was not last after all."); - } - - return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java deleted file mode 100644 index d9f1fbf..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunner.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning - * them via the {@link #processElementInReadyWindows(WindowedValue)}. - */ -public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> { - private final DoFnRunner<InputT, OutputT> underlying; - private final Collection<PCollectionView<?>> views; - private final ReadyCheckingSideInputReader sideInputReader; - - private Set<BoundedWindow> notReadyWindows; - - public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create( - DoFnRunner<InputT, OutputT> underlying, - Collection<PCollectionView<?>> views, - ReadyCheckingSideInputReader sideInputReader) { - return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader); - } - - private PushbackSideInputDoFnRunner( - DoFnRunner<InputT, OutputT> underlying, - Collection<PCollectionView<?>> views, - ReadyCheckingSideInputReader sideInputReader) { - this.underlying = underlying; - this.views = views; - this.sideInputReader = sideInputReader; - } - - @Override - public void startBundle() { - notReadyWindows = new HashSet<>(); - underlying.startBundle(); - } - - /** - * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element - * for each window the element is in that is ready. - * - * @param elem the element to process in all ready windows - * @return each element that could not be processed because it requires a side input window - * that is not ready. - */ - public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { - if (views.isEmpty()) { - processElement(elem); - return Collections.emptyList(); - } - ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); - for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { - BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); - boolean isReady = !notReadyWindows.contains(mainInputWindow); - for (PCollectionView<?> view : views) { - BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal() - .getWindowFn() - .getSideInputWindow(mainInputWindow); - if (!sideInputReader.isReady(view, sideInputWindow)) { - isReady = false; - break; - } - } - if (isReady) { - processElement(windowElem); - } else { - notReadyWindows.add(mainInputWindow); - pushedBack.add(windowElem); - } - } - return pushedBack.build(); - } - - @Override - public void processElement(WindowedValue<InputT> elem) { - underlying.processElement(elem); - } - - /** - * Call the underlying {@link DoFnRunner#finishBundle()}. - */ - @Override - public void finishBundle() { - notReadyWindows = null; - underlying.finishBundle(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java deleted file mode 100644 index 8135a5b..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFn.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import java.io.Serializable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.joda.time.Instant; - -/** - * Specification for processing to happen after elements have been grouped by key. - * - * @param <K> The type of key being processed. - * @param <InputT> The type of input values associated with the key. - * @param <OutputT> The output type that will be produced for each key. - * @param <W> The type of windows this operates on. - */ -public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow> - implements Serializable { - - /** Information accessible to all the processing methods in this {@code ReduceFn}. */ - public abstract class Context { - /** Return the key that is being processed. */ - public abstract K key(); - - /** The window that is being processed. */ - public abstract W window(); - - /** Access the current {@link WindowingStrategy}. */ - public abstract WindowingStrategy<?, W> windowingStrategy(); - - /** Return the interface for accessing state. */ - public abstract StateAccessor<K> state(); - - /** Return the interface for accessing timers. */ - public abstract Timers timers(); - } - - /** Information accessible within {@link #processValue}. */ - public abstract class ProcessValueContext extends Context { - /** Return the actual value being processed. */ - public abstract InputT value(); - - /** Return the timestamp associated with the value. */ - public abstract Instant timestamp(); - } - - /** Information accessible within {@link #onMerge}. */ - public abstract class OnMergeContext extends Context { - /** Return the interface for accessing state. */ - @Override - public abstract MergingStateAccessor<K, W> state(); - } - - /** Information accessible within {@link #onTrigger}. */ - public abstract class OnTriggerContext extends Context { - /** Returns the {@link PaneInfo} for the trigger firing being processed. */ - public abstract PaneInfo paneInfo(); - - /** Output the given value in the current window. */ - public abstract void output(OutputT value); - } - - ////////////////////////////////////////////////////////////////////////////////////////////////// - - /** - * Called for each value of type {@code InputT} associated with the current key. - */ - public abstract void processValue(ProcessValueContext c) throws Exception; - - /** - * Called when windows are merged. - */ - public abstract void onMerge(OnMergeContext context) throws Exception; - - /** - * Called when triggers fire. - * - * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit - * any results that should be included in the pane produced by this trigger firing. - */ - public abstract void onTrigger(OnTriggerContext context) throws Exception; - - /** - * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed - * state. - * - * @param c Context to use prefetch from. - */ - public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {} - - /** - * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed - * state. - * - * @param context Context to use prefetch from. - */ - public void prefetchOnTrigger(StateAccessor<K> context) {} - - /** - * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be - * called when the windowing is closing and will receive no future interactions. - */ - public abstract void clearState(Context context) throws Exception; - - /** - * Returns true if the there is no buffered state. - */ - public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context); -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java deleted file mode 100644 index 5d27d51..0000000 --- a/runners/core-java/src/main/java/org/apache/beam/sdk/util/ReduceFnContextFactory.java +++ /dev/null @@ -1,493 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.ImmutableMap; -import java.util.Collection; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.State; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateContext; -import org.apache.beam.sdk.util.state.StateContexts; -import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaces; -import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; -import org.apache.beam.sdk.util.state.StateTag; -import org.joda.time.Instant; - -/** - * Factory for creating instances of the various {@link ReduceFn} contexts. - */ -class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { - public interface OnTriggerCallbacks<OutputT> { - void output(OutputT toOutput); - } - - private final K key; - private final ReduceFn<K, InputT, OutputT, W> reduceFn; - private final WindowingStrategy<?, W> windowingStrategy; - private final StateInternals<K> stateInternals; - private final ActiveWindowSet<W> activeWindows; - private final TimerInternals timerInternals; - private final WindowingInternals<?, ?> windowingInternals; - private final PipelineOptions options; - - ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn, - WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals, - ActiveWindowSet<W> activeWindows, TimerInternals timerInternals, - WindowingInternals<?, ?> windowingInternals, PipelineOptions options) { - this.key = key; - this.reduceFn = reduceFn; - this.windowingStrategy = windowingStrategy; - this.stateInternals = stateInternals; - this.activeWindows = activeWindows; - this.timerInternals = timerInternals; - this.windowingInternals = windowingInternals; - this.options = options; - } - - /** Where should we look for state associated with a given window? */ - public static enum StateStyle { - /** All state is associated with the window itself. */ - DIRECT, - /** State is associated with the 'state address' windows tracked by the active window set. */ - RENAMED - } - - private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) { - return new StateAccessorImpl<K, W>( - activeWindows, windowingStrategy.getWindowFn().windowCoder(), - stateInternals, StateContexts.createFromComponents(options, windowingInternals, window), - style); - } - - public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) { - return new ContextImpl(stateAccessor(window, style)); - } - - public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue( - W window, InputT value, Instant timestamp, StateStyle style) { - return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp); - } - - public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window, - ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) { - return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks); - } - - public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge( - Collection<W> activeToBeMerged, W mergeResult, StateStyle style) { - return new OnMergeContextImpl( - new MergingStateAccessorImpl<K, W>(activeWindows, - windowingStrategy.getWindowFn().windowCoder(), - stateInternals, style, activeToBeMerged, mergeResult)); - } - - public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) { - return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>( - activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window)); - } - - private class TimersImpl implements Timers { - private final StateNamespace namespace; - - public TimersImpl(StateNamespace namespace) { - checkArgument(namespace instanceof WindowNamespace); - this.namespace = namespace; - } - - @Override - public void setTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain)); - } - - @Override - public void deleteTimer(Instant timestamp, TimeDomain timeDomain) { - timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain)); - } - - @Override - public Instant currentProcessingTime() { - return timerInternals.currentProcessingTime(); - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return timerInternals.currentSynchronizedProcessingTime(); - } - - @Override - public Instant currentEventTime() { - return timerInternals.currentInputWatermarkTime(); - } - } - - // ====================================================================== - // StateAccessors - // ====================================================================== - static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> { - - - protected final ActiveWindowSet<W> activeWindows; - protected final StateContext<W> context; - protected final StateNamespace windowNamespace; - protected final Coder<W> windowCoder; - protected final StateInternals<K> stateInternals; - protected final StateStyle style; - - public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, - StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) { - - this.activeWindows = activeWindows; - this.windowCoder = windowCoder; - this.stateInternals = stateInternals; - this.context = checkNotNull(context); - this.windowNamespace = namespaceFor(context.window()); - this.style = style; - } - - protected StateNamespace namespaceFor(W window) { - return StateNamespaces.window(windowCoder, window); - } - - protected StateNamespace windowNamespace() { - return windowNamespace; - } - - W window() { - return context.window(); - } - - StateNamespace namespace() { - return windowNamespace(); - } - - @Override - public <StateT extends State> StateT access(StateTag<? super K, StateT> address) { - switch (style) { - case DIRECT: - return stateInternals.state(windowNamespace(), address, context); - case RENAMED: - return stateInternals.state( - namespaceFor(activeWindows.writeStateAddress(context.window())), address, context); - } - throw new RuntimeException(); // cases are exhaustive. - } - } - - static class MergingStateAccessorImpl<K, W extends BoundedWindow> - extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> { - private final Collection<W> activeToBeMerged; - - public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, - StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged, - W mergeResult) { - super(activeWindows, windowCoder, stateInternals, - StateContexts.windowOnly(mergeResult), style); - this.activeToBeMerged = activeToBeMerged; - } - - @Override - public <StateT extends State> StateT access(StateTag<? super K, StateT> address) { - switch (style) { - case DIRECT: - return stateInternals.state(windowNamespace(), address, context); - case RENAMED: - return stateInternals.state( - namespaceFor(activeWindows.mergedWriteStateAddress( - activeToBeMerged, context.window())), - address, - context); - } - throw new RuntimeException(); // cases are exhaustive. - } - - @Override - public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( - StateTag<? super K, StateT> address) { - ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); - for (W mergingWindow : activeToBeMerged) { - StateNamespace namespace = null; - switch (style) { - case DIRECT: - namespace = namespaceFor(mergingWindow); - break; - case RENAMED: - namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow)); - break; - } - checkNotNull(namespace); // cases are exhaustive. - builder.put(mergingWindow, stateInternals.state(namespace, address, context)); - } - return builder.build(); - } - } - - static class PremergingStateAccessorImpl<K, W extends BoundedWindow> - extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> { - public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder, - StateInternals<K> stateInternals, W window) { - super(activeWindows, windowCoder, stateInternals, - StateContexts.windowOnly(window), StateStyle.RENAMED); - } - - Collection<W> mergingWindows() { - return activeWindows.readStateAddresses(context.window()); - } - - @Override - public <StateT extends State> Map<W, StateT> accessInEachMergingWindow( - StateTag<? super K, StateT> address) { - ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder(); - for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) { - StateT stateForWindow = - stateInternals.state(namespaceFor(stateAddressWindow), address, context); - builder.put(stateAddressWindow, stateForWindow); - } - return builder.build(); - } - } - - // ====================================================================== - // Contexts - // ====================================================================== - - private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context { - private final StateAccessorImpl<K, W> state; - private final TimersImpl timers; - - private ContextImpl(StateAccessorImpl<K, W> state) { - reduceFn.super(); - this.state = state; - this.timers = new TimersImpl(state.namespace()); - } - - @Override - public K key() { - return key; - } - - @Override - public W window() { - return state.window(); - } - - @Override - public WindowingStrategy<?, W> windowingStrategy() { - return windowingStrategy; - } - - @Override - public StateAccessor<K> state() { - return state; - } - - @Override - public Timers timers() { - return timers; - } - } - - private class ProcessValueContextImpl - extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext { - private final InputT value; - private final Instant timestamp; - private final StateAccessorImpl<K, W> state; - private final TimersImpl timers; - - private ProcessValueContextImpl(StateAccessorImpl<K, W> state, - InputT value, Instant timestamp) { - reduceFn.super(); - this.state = state; - this.value = value; - this.timestamp = timestamp; - this.timers = new TimersImpl(state.namespace()); - } - - @Override - public K key() { - return key; - } - - @Override - public W window() { - return state.window(); - } - - @Override - public WindowingStrategy<?, W> windowingStrategy() { - return windowingStrategy; - } - - @Override - public StateAccessor<K> state() { - return state; - } - - @Override - public InputT value() { - return value; - } - - @Override - public Instant timestamp() { - return timestamp; - } - - @Override - public Timers timers() { - return timers; - } - } - - private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext { - private final StateAccessorImpl<K, W> state; - private final ReadableState<PaneInfo> pane; - private final OnTriggerCallbacks<OutputT> callbacks; - private final TimersImpl timers; - - private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane, - OnTriggerCallbacks<OutputT> callbacks) { - reduceFn.super(); - this.state = state; - this.pane = pane; - this.callbacks = callbacks; - this.timers = new TimersImpl(state.namespace()); - } - - @Override - public K key() { - return key; - } - - @Override - public W window() { - return state.window(); - } - - @Override - public WindowingStrategy<?, W> windowingStrategy() { - return windowingStrategy; - } - - @Override - public StateAccessor<K> state() { - return state; - } - - @Override - public PaneInfo paneInfo() { - return pane.read(); - } - - @Override - public void output(OutputT value) { - callbacks.output(value); - } - - @Override - public Timers timers() { - return timers; - } - } - - private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext { - private final MergingStateAccessorImpl<K, W> state; - private final TimersImpl timers; - - private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) { - reduceFn.super(); - this.state = state; - this.timers = new TimersImpl(state.namespace()); - } - - @Override - public K key() { - return key; - } - - @Override - public WindowingStrategy<?, W> windowingStrategy() { - return windowingStrategy; - } - - @Override - public MergingStateAccessor<K, W> state() { - return state; - } - - @Override - public W window() { - return state.window(); - } - - @Override - public Timers timers() { - return timers; - } - } - - private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext { - private final PremergingStateAccessorImpl<K, W> state; - private final TimersImpl timers; - - private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) { - reduceFn.super(); - this.state = state; - this.timers = new TimersImpl(state.namespace()); - } - - @Override - public K key() { - return key; - } - - @Override - public WindowingStrategy<?, W> windowingStrategy() { - return windowingStrategy; - } - - @Override - public MergingStateAccessor<K, W> state() { - return state; - } - - @Override - public W window() { - return state.window(); - } - - @Override - public Timers timers() { - return timers; - } - } -}