http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 3566f7e..89243a3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -18,173 +18,85 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; -import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.ImmutableList; import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.util.Collector; -import org.joda.time.Instant; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; +import java.util.Map; /** * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn} * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. */ -public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, OUT> { +public class FlinkDoFnFunction<InputT, OutputT> + extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> { - private final DoFn<IN, OUT> doFn; + private final DoFn<InputT, OutputT> doFn; private final SerializedPipelineOptions serializedOptions; - public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) { - this.doFn = doFn; - this.serializedOptions = new SerializedPipelineOptions(options); - } - - @Override - public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception { - ProcessContext context = new ProcessContext(doFn, out); - this.doFn.startBundle(context); - for (IN value : values) { - context.inValue = value; - doFn.processElement(context); - } - this.doFn.finishBundle(context); - } - - private class ProcessContext extends DoFn<IN, OUT>.ProcessContext { - - IN inValue; - Collector<OUT> outCollector; - - public ProcessContext(DoFn<IN, OUT> fn, Collector<OUT> outCollector) { - fn.super(); - super.setupDelegateAggregators(); - this.outCollector = outCollector; - } - - @Override - public IN element() { - return this.inValue; - } - + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - @Override - public Instant timestamp() { - return Instant.now(); - } + private final boolean requiresWindowAccess; + private final boolean hasSideInputs; - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } + private final WindowingStrategy<?, ?> windowingStrategy; - @Override - public WindowingInternals<IN, OUT> windowingInternals() { - return new WindowingInternals<IN, OUT>() { - @Override - public StateInternals stateInternals() { - return null; - } - - @Override - public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - - } - - @Override - public TimerInternals timerInternals() { - return null; - } + public FlinkDoFnFunction( + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions options) { + this.doFn = doFn; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializedPipelineOptions(options); + this.windowingStrategy = windowingStrategy; - @Override - public Collection<? extends BoundedWindow> windows() { - return ImmutableList.of(GlobalWindow.INSTANCE); - } + this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.hasSideInputs = !sideInputs.isEmpty(); + } - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } + @Override + public void mapPartition( + Iterable<WindowedValue<InputT>> values, + Collector<WindowedValue<OutputT>> out) throws Exception { + + FlinkProcessContext<InputT, OutputT> context = new FlinkProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + out, + sideInputs); - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - } + this.doFn.startBundle(context); - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() not implemented."); + if (!requiresWindowAccess || hasSideInputs) { + // we don't need to explode the windows + for (WindowedValue<InputT> value : values) { + context = context.forWindowedValue(value); + doFn.processElement(context); + } + } else { + // we need to explode the windows because we have per-window + // side inputs and window access also only works if an element + // is in only one window + for (WindowedValue<InputT> value : values) { + for (WindowedValue<InputT> explodedValue: value.explodeWindows()) { + context = context.forWindowedValue(value); + doFn.processElement(context); } - }; - } - - @Override - public PipelineOptions getPipelineOptions() { - return serializedOptions.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId()); - List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size()); - for (T input : sideInput) { - windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); } - return view.fromIterableInternal(windowedValueList); } - @Override - public void output(OUT output) { - outCollector.collect(output); - } - - @Override - public void outputWithTimestamp(OUT output, Instant timestamp) { - // not FLink's way, just output normally - output(output); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, wrapper); - return wrapper; - } - - + // set the windowed value to null so that the logic + // or outputting in finishBundle kicks in + context = context.forWindowedValue(null); + this.doFn.finishBundle(context); } + }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java deleted file mode 100644 index 7c7084d..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.sdk.values.KV; - -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.util.Collector; - -import java.util.Iterator; - -/** - * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for executing a - * {@link org.apache.beam.sdk.transforms.GroupByKey} operation. This reads the input - * {@link org.apache.beam.sdk.values.KV} elements, extracts the key and collects - * the values in a {@code List}. - */ -public class FlinkKeyedListAggregationFunction<K,V> implements GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> { - - @Override - public void reduce(Iterable<KV<K, V>> values, Collector<KV<K, Iterable<V>>> out) throws Exception { - Iterator<KV<K, V>> it = values.iterator(); - KV<K, V> first = it.next(); - Iterable<V> passThrough = new PassThroughIterable<>(first, it); - out.collect(KV.of(first.getKey(), passThrough)); - } - - private static class PassThroughIterable<K, V> implements Iterable<V>, Iterator<V> { - private KV<K, V> first; - private Iterator<KV<K, V>> iterator; - - public PassThroughIterable(KV<K, V> first, Iterator<KV<K, V>> iterator) { - this.first = first; - this.iterator = iterator; - } - - @Override - public Iterator<V> iterator() { - return this; - } - - @Override - public boolean hasNext() { - return first != null || iterator.hasNext(); - } - - @Override - public V next() { - if (first != null) { - V result = first.getValue(); - first = null; - return result; - } else { - return iterator.next().getValue(); - } - } - - @Override - public void remove() { - throw new UnsupportedOperationException("Cannot remove elements from input."); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java new file mode 100644 index 0000000..9074d72 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PerKeyCombineFnRunner; +import org.apache.beam.sdk.util.PerKeyCombineFnRunners; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Special version of {@link FlinkReduceFunction} that supports merging windows. This + * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the + * same behaviour as {@code MergeOverlappingIntervalWindows}. + * + * <p>This is different from the pair of function for the non-merging windows case + * in that we cannot do combining before the shuffle because elements would not + * yet be in their correct windows for side-input access. + */ +public class FlinkMergingNonShuffleReduceFunction< + K, InputT, AccumT, OutputT, W extends IntervalWindow> + extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> { + + private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn; + + private final DoFn<KV<K, InputT>, KV<K, OutputT>> doFn; + + private final WindowingStrategy<?, W> windowingStrategy; + + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + private final SerializedPipelineOptions serializedOptions; + + public FlinkMergingNonShuffleReduceFunction( + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + + this.combineFn = keyedCombineFn; + + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + + // dummy DoFn because we need one for ProcessContext + this.doFn = new DoFn<KV<K, InputT>, KV<K, OutputT>>() { + @Override + public void processElement(ProcessContext c) throws Exception { + + } + }; + } + + @Override + public void reduce( + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + FlinkProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext = + new FlinkProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + out, + sideInputs); + + PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, InputT>> inputValue: elements) { + for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { + @Override + public int compare( + WindowedValue<KV<K, InputT>> o1, + WindowedValue<KV<K, InputT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // merge windows, we have to do it in an extra pre-processing step and + // can't do it as we go since the window of early elements would not + // be correct when calling the CombineFn + mergeWindow(sortedInput); + + // iterate over the elements that are sorted by window timestamp + final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); + + // create accumulator using the first elements key + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); + InputT firstValue = currentValue.getValue().getValue(); + processContext = processContext.forWindowedValue(currentValue); + AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + + while (iterator.hasNext()) { + WindowedValue<KV<K, InputT>> nextValue = iterator.next(); + IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + + if (currentWindow.equals(nextWindow)) { + // continue accumulating and merge windows + + InputT value = nextValue.getValue().getValue(); + processContext = processContext.forWindowedValue(nextValue); + accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + + currentWindow = nextWindow; + InputT value = nextValue.getValue().getValue(); + processContext = processContext.forWindowedValue(nextValue); + accumulator = combineFnRunner.createAccumulator(key, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + } + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + } + + /** + * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. + * This replaces windows in the input list. + */ + private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { + int currentStart = 0; + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); + + for (int i = 1; i < elements.size(); i++) { + WindowedValue<KV<K, InputT>> nextValue = elements.get(i); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + if (currentWindow.intersects(nextWindow)) { + // we continue + currentWindow = currentWindow.span(nextWindow); + } else { + // retrofit the merged window to all windows up to "currentStart" + for (int j = i - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + currentStart = i; + currentWindow = nextWindow; + } + } + if (currentStart < elements.size() - 1) { + // we have to retrofit the last batch + for (int j = elements.size() - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java new file mode 100644 index 0000000..c12e420 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PerKeyCombineFnRunner; +import org.apache.beam.sdk.util.PerKeyCombineFnRunners; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Special version of {@link FlinkPartialReduceFunction} that supports merging windows. This + * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the + * same behaviour as {@code MergeOverlappingIntervalWindows}. + */ +public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow> + extends FlinkPartialReduceFunction<K, InputT, AccumT, W> { + + public FlinkMergingPartialReduceFunction( + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + super(combineFn, windowingStrategy, sideInputs, pipelineOptions); + } + + @Override + public void combine( + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { + + FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = + new FlinkProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + out, + sideInputs); + + PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, InputT>> inputValue: elements) { + for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { + @Override + public int compare( + WindowedValue<KV<K, InputT>> o1, + WindowedValue<KV<K, InputT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // merge windows, we have to do it in an extra pre-processing step and + // can't do it as we go since the window of early elements would not + // be correct when calling the CombineFn + mergeWindow(sortedInput); + + // iterate over the elements that are sorted by window timestamp + final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); + + // create accumulator using the first elements key + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); + InputT firstValue = currentValue.getValue().getValue(); + processContext = processContext.forWindowedValue(currentValue); + AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + + while (iterator.hasNext()) { + WindowedValue<KV<K, InputT>> nextValue = iterator.next(); + IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + + if (currentWindow.equals(nextWindow)) { + // continue accumulating and merge windows + + InputT value = nextValue.getValue().getValue(); + processContext = processContext.forWindowedValue(nextValue); + accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + + currentWindow = nextWindow; + InputT value = nextValue.getValue().getValue(); + processContext = processContext.forWindowedValue(nextValue); + accumulator = combineFnRunner.createAccumulator(key, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + } + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + } + + /** + * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. + * This replaces windows in the input list. + */ + private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { + int currentStart = 0; + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); + + for (int i = 1; i < elements.size(); i++) { + WindowedValue<KV<K, InputT>> nextValue = elements.get(i); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + if (currentWindow.intersects(nextWindow)) { + // we continue + currentWindow = currentWindow.span(nextWindow); + } else { + // retrofit the merged window to all windows up to "currentStart" + for (int j = i - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + currentStart = i; + currentWindow = nextWindow; + } + } + if (currentStart < elements.size() - 1) { + // we have to retrofit the last batch + for (int j = elements.size() - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java new file mode 100644 index 0000000..07d1c97 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PerKeyCombineFnRunner; +import org.apache.beam.sdk.util.PerKeyCombineFnRunners; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Special version of {@link FlinkReduceFunction} that supports merging windows. This + * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the + * same behaviour as {@code MergeOverlappingIntervalWindows}. + */ +public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow> + extends FlinkReduceFunction<K, AccumT, OutputT, W> { + + public FlinkMergingReduceFunction( + CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions); + } + + @Override + public void reduce( + Iterable<WindowedValue<KV<K, AccumT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = + new FlinkProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + out, + sideInputs); + + PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, AccumT>> inputValue: elements) { + for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { + @Override + public int compare( + WindowedValue<KV<K, AccumT>> o1, + WindowedValue<KV<K, AccumT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // merge windows, we have to do it in an extra pre-processing step and + // can't do it as we go since the window of early elements would not + // be correct when calling the CombineFn + mergeWindow(sortedInput); + + // iterate over the elements that are sorted by window timestamp + final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); + + // get the first accumulator + WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); + AccumT accumulator = currentValue.getValue().getValue(); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn, + // in FlinkPartialReduceFunction we already merge the timestamps assigned + // to individual elements, here we just merge them + List<Instant> windowTimestamps = new ArrayList<>(); + windowTimestamps.add(currentValue.getTimestamp()); + + while (iterator.hasNext()) { + WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + + if (nextWindow.equals(currentWindow)) { + // continue accumulating and merge windows + + processContext = processContext.forWindowedValue(nextValue); + + accumulator = combineFnRunner.mergeAccumulators( + key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext); + + windowTimestamps.add(nextValue.getTimestamp()); + } else { + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); + + windowTimestamps.clear(); + + processContext = processContext.forWindowedValue(nextValue); + + currentWindow = nextWindow; + accumulator = nextValue.getValue().getValue(); + windowTimestamps.add(nextValue.getTimestamp()); + } + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); + } + + /** + * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. + * This replaces windows in the input list. + */ + private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) { + int currentStart = 0; + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); + + for (int i = 1; i < elements.size(); i++) { + WindowedValue<KV<K, AccumT>> nextValue = elements.get(i); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + if (currentWindow.intersects(nextWindow)) { + // we continue + currentWindow = currentWindow.span(nextWindow); + } else { + // retrofit the merged window to all windows up to "currentStart" + for (int j = i - 1; j >= currentStart; j--) { + WindowedValue<KV<K, AccumT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + currentStart = i; + currentWindow = nextWindow; + } + } + if (currentStart < elements.size() - 1) { + // we have to retrofit the last batch + for (int j = elements.size() - 1; j >= currentStart; j--) { + WindowedValue<KV<K, AccumT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index 476dc5e..f92e76f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -18,28 +18,17 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; -import com.google.common.collect.ImmutableList; - import org.apache.flink.api.common.functions.RichMapPartitionFunction; import org.apache.flink.util.Collector; -import org.joda.time.Instant; -import java.util.ArrayList; -import java.util.List; import java.util.Map; /** @@ -50,112 +39,72 @@ import java.util.Map; * and must tag all outputs with the output number. Afterwards a filter will filter out * those elements that are not to be in a specific output. */ -public class FlinkMultiOutputDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, RawUnionValue> { - - private final DoFn<IN, OUT> doFn; - private final SerializedPipelineOptions serializedPipelineOptions; - private final Map<TupleTag<?>, Integer> outputMap; - - public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap) { - this.doFn = doFn; - this.serializedPipelineOptions = new SerializedPipelineOptions(options); - this.outputMap = outputMap; - } - - @Override - public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) throws Exception { - ProcessContext context = new ProcessContext(doFn, out); - this.doFn.startBundle(context); - for (IN value : values) { - context.inValue = value; - doFn.processElement(context); - } - this.doFn.finishBundle(context); - } +public class FlinkMultiOutputDoFnFunction<InputT, OutputT> + extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<RawUnionValue>> { - private class ProcessContext extends DoFn<IN, OUT>.ProcessContext { + private final DoFn<InputT, OutputT> doFn; + private final SerializedPipelineOptions serializedOptions; - IN inValue; - Collector<RawUnionValue> outCollector; + private final Map<TupleTag<?>, Integer> outputMap; - public ProcessContext(DoFn<IN, OUT> fn, Collector<RawUnionValue> outCollector) { - fn.super(); - this.outCollector = outCollector; - } + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - @Override - public IN element() { - return this.inValue; - } + private final boolean requiresWindowAccess; + private final boolean hasSideInputs; - @Override - public Instant timestamp() { - return Instant.now(); - } + private final WindowingStrategy<?, ?> windowingStrategy; - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } + public FlinkMultiOutputDoFnFunction( + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions options, + Map<TupleTag<?>, Integer> outputMap) { + this.doFn = doFn; + this.serializedOptions = new SerializedPipelineOptions(options); + this.outputMap = outputMap; - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } + this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.hasSideInputs = !sideInputs.isEmpty(); + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + } - @Override - public WindowingInternals<IN, OUT> windowingInternals() { - return null; - } + @Override + public void mapPartition( + Iterable<WindowedValue<InputT>> values, + Collector<WindowedValue<RawUnionValue>> out) throws Exception { + + FlinkProcessContext<InputT, OutputT> context = new FlinkMultiOutputProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + out, + outputMap, + sideInputs); - @Override - public PipelineOptions getPipelineOptions() { - return serializedPipelineOptions.getPipelineOptions(); - } + this.doFn.startBundle(context); - @Override - public <T> T sideInput(PCollectionView<T> view) { - List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal() - .getId()); - List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size()); - for (T input : sideInput) { - windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); + if (!requiresWindowAccess || hasSideInputs) { + // we don't need to explode the windows + for (WindowedValue<InputT> value : values) { + context = context.forWindowedValue(value); + doFn.processElement(context); } - return view.fromIterableInternal(windowedValueList); - } - - @Override - public void output(OUT value) { - // assume that index 0 is the default output - outCollector.collect(new RawUnionValue(0, value)); - } - - @Override - public void outputWithTimestamp(OUT output, Instant timestamp) { - // not FLink's way, just output normally - output(output); - } - - @Override - @SuppressWarnings("unchecked") - public <T> void sideOutput(TupleTag<T> tag, T value) { - Integer index = outputMap.get(tag); - if (index != null) { - outCollector.collect(new RawUnionValue(index, value)); + } else { + // we need to explode the windows because we have per-window + // side inputs and window access also only works if an element + // is in only one window + for (WindowedValue<InputT> value : values) { + for (WindowedValue<InputT> explodedValue: value.explodeWindows()) { + context = context.forWindowedValue(value); + doFn.processElement(context); + } } } - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, wrapper); - return null; - } + this.doFn.finishBundle(context); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java new file mode 100644 index 0000000..71b6d27 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.util.Collection; +import java.util.Map; + +/** + * {@link DoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports + * side outputs. + */ +class FlinkMultiOutputProcessContext<InputT, OutputT> + extends FlinkProcessContext<InputT, OutputT> { + + // we need a different Collector from the base class + private final Collector<WindowedValue<RawUnionValue>> collector; + + private final Map<TupleTag<?>, Integer> outputMap; + + + FlinkMultiOutputProcessContext( + PipelineOptions pipelineOptions, + RuntimeContext runtimeContext, + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Collector<WindowedValue<RawUnionValue>> collector, + Map<TupleTag<?>, Integer> outputMap, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { + super( + pipelineOptions, + runtimeContext, + doFn, + windowingStrategy, + new Collector<WindowedValue<OutputT>>() { + @Override + public void collect(WindowedValue<OutputT> outputTWindowedValue) { + + } + + @Override + public void close() { + + } + }, + sideInputs); + + this.collector = collector; + this.outputMap = outputMap; + } + + @Override + public FlinkProcessContext<InputT, OutputT> forWindowedValue( + WindowedValue<InputT> windowedValue) { + this.windowedValue = windowedValue; + return this; + } + + @Override + public void outputWithTimestamp(OutputT value, Instant timestamp) { + if (windowedValue == null) { + // we are in startBundle() or finishBundle() + + try { + Collection windows = windowingStrategy.getWindowFn().assignWindows( + new FlinkNoElementAssignContext( + windowingStrategy.getWindowFn(), + value, + timestamp)); + + collector.collect( + WindowedValue.of( + new RawUnionValue(0, value), + timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), + windows, + PaneInfo.NO_FIRING)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + collector.collect( + WindowedValue.of( + new RawUnionValue(0, value), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPane())); + } + } + + @Override + protected void outputWithTimestampAndWindow( + OutputT value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + collector.collect( + WindowedValue.of( + new RawUnionValue(0, value), timestamp, windows, pane)); + } + + @Override + @SuppressWarnings("unchecked") + public <T> void sideOutput(TupleTag<T> tag, T value) { + if (windowedValue != null) { + sideOutputWithTimestamp(tag, value, windowedValue.getTimestamp()); + } else { + sideOutputWithTimestamp(tag, value, null); + } + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) { + Integer index = outputMap.get(tag); + + if (index == null) { + throw new IllegalArgumentException("Unknown side output tag: " + tag); + } + + if (windowedValue == null) { + // we are in startBundle() or finishBundle() + + try { + Collection windows = windowingStrategy.getWindowFn().assignWindows( + new FlinkNoElementAssignContext( + windowingStrategy.getWindowFn(), + value, + timestamp)); + + collector.collect( + WindowedValue.of( + new RawUnionValue(index, value), + timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), + windows, + PaneInfo.NO_FIRING)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + collector.collect( + WindowedValue.of( + new RawUnionValue(index, value), + windowedValue.getTimestamp(), + windowedValue.getWindows(), + windowedValue.getPane())); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java index 58a36b2..9205a55 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java @@ -18,27 +18,34 @@ package org.apache.beam.runners.flink.translation.functions; import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.util.Collector; /** - * A FlatMap function that filters out those elements that don't belong in this output. We need - * this to implement MultiOutput ParDo functions. + * A {@link FlatMapFunction} function that filters out those elements that don't belong in this + * output. We need this to implement MultiOutput ParDo functions in combination with + * {@link FlinkMultiOutputDoFnFunction}. */ -public class FlinkMultiOutputPruningFunction<T> implements FlatMapFunction<RawUnionValue, T> { +public class FlinkMultiOutputPruningFunction<T> + implements FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<T>> { - private final int outputTag; + private final int ourOutputTag; - public FlinkMultiOutputPruningFunction(int outputTag) { - this.outputTag = outputTag; + public FlinkMultiOutputPruningFunction(int ourOutputTag) { + this.ourOutputTag = ourOutputTag; } @Override @SuppressWarnings("unchecked") - public void flatMap(RawUnionValue rawUnionValue, Collector<T> collector) throws Exception { - if (rawUnionValue.getUnionTag() == outputTag) { - collector.collect((T) rawUnionValue.getValue()); + public void flatMap( + WindowedValue<RawUnionValue> windowedValue, + Collector<WindowedValue<T>> collector) throws Exception { + int unionTag = windowedValue.getValue().getUnionTag(); + if (unionTag == ourOutputTag) { + collector.collect( + (WindowedValue<T>) windowedValue.withValue(windowedValue.getValue().getValue())); } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java new file mode 100644 index 0000000..892f7a1 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.WindowFn; + +import org.joda.time.Instant; + +import java.util.Collection; + +/** + * {@link WindowFn.AssignContext} for calling a {@link WindowFn} for elements emitted from + * {@link org.apache.beam.sdk.transforms.DoFn#startBundle(DoFn.Context)} + * or {@link DoFn#finishBundle(DoFn.Context)}. + * + * <p>In those cases the {@code WindowFn} is not allowed to access any element information. + */ +class FlinkNoElementAssignContext<InputT, W extends BoundedWindow> + extends WindowFn<InputT, W>.AssignContext { + + private final InputT element; + private final Instant timestamp; + + FlinkNoElementAssignContext( + WindowFn<InputT, W> fn, + InputT element, + Instant timestamp) { + fn.super(); + + this.element = element; + // the timestamp can be null, in that case output is called + // without a timestamp + this.timestamp = timestamp; + } + + @Override + public InputT element() { + return element; + } + + @Override + public Instant timestamp() { + if (timestamp != null) { + return timestamp; + } else { + throw new UnsupportedOperationException("No timestamp available."); + } + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException("No windows available."); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index a2bab2b..c29e1df 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -17,45 +17,170 @@ */ package org.apache.beam.runners.flink.translation.functions; -import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.PerKeyCombineFnRunner; +import org.apache.beam.sdk.util.PerKeyCombineFnRunners; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.api.common.functions.GroupCombineFunction; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +import org.apache.flink.api.common.functions.RichGroupCombineFunction; import org.apache.flink.util.Collector; +import org.joda.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.Map; /** - * Flink {@link org.apache.flink.api.common.functions.GroupCombineFunction} for executing a - * {@link org.apache.beam.sdk.transforms.Combine.PerKey} operation. This reads the input - * {@link org.apache.beam.sdk.values.KV} elements VI, extracts the key and emits accumulated - * values which have the intermediate format VA. + * This is is the first step for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} + * on Flink. The second part is {@link FlinkReduceFunction}. This function performs a local + * combine step before shuffling while the latter does the final combination after a shuffle. + * + * <p>The input to {@link #combine(Iterable, Collector)} are elements of the same key but + * for different windows. We have to ensure that we only combine elements of matching + * windows. */ -public class FlinkPartialReduceFunction<K, VI, VA> implements GroupCombineFunction<KV<K, VI>, KV<K, VA>> { +public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow> + extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> { + + protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn; + + protected final DoFn<KV<K, InputT>, KV<K, AccumT>> doFn; + + protected final WindowingStrategy<?, W> windowingStrategy; + + protected final SerializedPipelineOptions serializedOptions; - private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn; + protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?> - keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; + public FlinkPartialReduceFunction( + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + + this.combineFn = combineFn; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + + // dummy DoFn because we need one for ProcessContext + this.doFn = new DoFn<KV<K, InputT>, KV<K, AccumT>>() { + @Override + public void processElement(ProcessContext c) throws Exception { + + } + }; } @Override - public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> out) throws Exception { + public void combine( + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { + + FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = + new FlinkProcessContext<>( + serializedOptions.getPipelineOptions(), + getRuntimeContext(), + doFn, + windowingStrategy, + out, + sideInputs); + + PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, InputT>> inputValue: elements) { + for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { + @Override + public int compare( + WindowedValue<KV<K, InputT>> o1, + WindowedValue<KV<K, InputT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // iterate over the elements that are sorted by window timestamp + // + final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); - final Iterator<KV<K, VI>> iterator = elements.iterator(); // create accumulator using the first elements key - KV<K, VI> first = iterator.next(); - K key = first.getKey(); - VI value = first.getValue(); - VA accumulator = keyedCombineFn.createAccumulator(key); - accumulator = keyedCombineFn.addInput(key, accumulator, value); - - while(iterator.hasNext()) { - value = iterator.next().getValue(); - accumulator = keyedCombineFn.addInput(key, accumulator, value); + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); + InputT firstValue = currentValue.getValue().getValue(); + processContext = processContext.forWindowedValue(currentValue); + AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + + while (iterator.hasNext()) { + WindowedValue<KV<K, InputT>> nextValue = iterator.next(); + BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); + + if (nextWindow.equals(currentWindow)) { + // continue accumulating + InputT value = nextValue.getValue().getValue(); + processContext = processContext.forWindowedValue(nextValue); + accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + + currentWindow = nextWindow; + InputT value = nextValue.getValue().getValue(); + processContext = processContext.forWindowedValue(nextValue); + accumulator = combineFnRunner.createAccumulator(key, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + } } - out.collect(KV.of(key, accumulator)); + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java new file mode 100644 index 0000000..0f1885c --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; + +/** + * {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext} for our Flink Wrappers. + */ +class FlinkProcessContext<InputT, OutputT> + extends DoFn<InputT, OutputT>.ProcessContext { + + private final PipelineOptions pipelineOptions; + private final RuntimeContext runtimeContext; + private Collector<WindowedValue<OutputT>> collector; + private final boolean requiresWindowAccess; + + protected WindowedValue<InputT> windowedValue; + + protected WindowingStrategy<?, ?> windowingStrategy; + + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + FlinkProcessContext( + PipelineOptions pipelineOptions, + RuntimeContext runtimeContext, + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Collector<WindowedValue<OutputT>> collector, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { + doFn.super(); + Preconditions.checkNotNull(pipelineOptions); + Preconditions.checkNotNull(runtimeContext); + Preconditions.checkNotNull(doFn); + Preconditions.checkNotNull(collector); + + this.pipelineOptions = pipelineOptions; + this.runtimeContext = runtimeContext; + this.collector = collector; + this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + + super.setupDelegateAggregators(); + } + + FlinkProcessContext( + PipelineOptions pipelineOptions, + RuntimeContext runtimeContext, + DoFn<InputT, OutputT> doFn, + WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) { + doFn.super(); + Preconditions.checkNotNull(pipelineOptions); + Preconditions.checkNotNull(runtimeContext); + Preconditions.checkNotNull(doFn); + + this.pipelineOptions = pipelineOptions; + this.runtimeContext = runtimeContext; + this.collector = null; + this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + + super.setupDelegateAggregators(); + } + + public FlinkProcessContext<InputT, OutputT> forOutput( + Collector<WindowedValue<OutputT>> collector) { + this.collector = collector; + + // for now, returns ourselves, to be easy on the GC + return this; + } + + + + public FlinkProcessContext<InputT, OutputT> forWindowedValue( + WindowedValue<InputT> windowedValue) { + this.windowedValue = windowedValue; + + // for now, returns ourselves, to be easy on the GC + return this; + } + + @Override + public InputT element() { + return this.windowedValue.getValue(); + } + + + @Override + public Instant timestamp() { + return windowedValue.getTimestamp(); + } + + @Override + public BoundedWindow window() { + if (!requiresWindowAccess) { + throw new UnsupportedOperationException( + "window() is only available in the context of a DoFn marked as RequiresWindow."); + } + return Iterables.getOnlyElement(windowedValue.getWindows()); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + + return new WindowingInternals<InputT, OutputT>() { + + @Override + public StateInternals stateInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public void outputWindowedValue( + OutputT value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + collector.collect(WindowedValue.of(value, timestamp, windows, pane)); + outputWithTimestampAndWindow(value, timestamp, windows, pane); + } + + @Override + public TimerInternals timerInternals() { + throw new UnsupportedOperationException(); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return windowedValue.getWindows(); + } + + @Override + public PaneInfo pane() { + return windowedValue.getPane(); + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, + Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public <ViewT> ViewT sideInput( + PCollectionView<ViewT> view, + BoundedWindow mainInputWindow) { + + Preconditions.checkNotNull(view, "View passed to sideInput cannot be null"); + Preconditions.checkNotNull( + sideInputs.get(view), + "Side input for " + view + " not available."); + + // get the side input strategy for mapping the window + WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); + + BoundedWindow sideInputWindow = + windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow); + + Map<BoundedWindow, ViewT> sideInputs = + runtimeContext.getBroadcastVariableWithInitializer( + view.getTagInternal().getId(), new SideInputInitializer<>(view)); + return sideInputs.get(sideInputWindow); + } + }; + } + + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions; + } + + @Override + public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) { + Preconditions.checkNotNull(view, "View passed to sideInput cannot be null"); + Preconditions.checkNotNull(sideInputs.get(view), "Side input for " + view + " not available."); + Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator(); + BoundedWindow window; + if (!windowIter.hasNext()) { + throw new IllegalStateException( + "sideInput called when main input element is not in any windows"); + } else { + window = windowIter.next(); + if (windowIter.hasNext()) { + throw new IllegalStateException( + "sideInput called when main input element is in multiple windows"); + } + } + + // get the side input strategy for mapping the window + WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); + + BoundedWindow sideInputWindow = + windowingStrategy.getWindowFn().getSideInputWindow(window); + + Map<BoundedWindow, ViewT> sideInputs = + runtimeContext.getBroadcastVariableWithInitializer( + view.getTagInternal().getId(), new SideInputInitializer<>(view)); + ViewT result = sideInputs.get(sideInputWindow); + if (result == null) { + result = view.fromIterableInternal(Collections.<WindowedValue<?>>emptyList()); + } + return result; + } + + @Override + public void output(OutputT value) { + if (windowedValue != null) { + outputWithTimestamp(value, windowedValue.getTimestamp()); + } else { + outputWithTimestamp(value, null); + } + } + + @Override + public void outputWithTimestamp(OutputT value, Instant timestamp) { + if (windowedValue == null) { + // we are in startBundle() or finishBundle() + + try { + Collection windows = windowingStrategy.getWindowFn().assignWindows( + new FlinkNoElementAssignContext( + windowingStrategy.getWindowFn(), + value, + timestamp)); + + collector.collect( + WindowedValue.of( + value, + timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), + windows, + PaneInfo.NO_FIRING)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + collector.collect( + WindowedValue.of( + value, + timestamp, + windowedValue.getWindows(), + windowedValue.getPane())); + } + } + + protected void outputWithTimestampAndWindow( + OutputT value, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + collector.collect( + WindowedValue.of( + value, timestamp, windows, pane)); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + throw new UnsupportedOperationException(); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> + createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = + new SerializableFnAggregatorWrapper<>(combiner); + runtimeContext.addAccumulator(name, wrapper); + return wrapper; + } +}