http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java new file mode 100644 index 0000000..26fd0b4 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * Special version of {@link FlinkReduceFunction} that supports merging windows. This + * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the + * same behaviour as {@code MergeOverlappingIntervalWindows}. + * + * <p>This is different from the pair of function for the non-merging windows case + * in that we cannot do combining before the shuffle because elements would not + * yet be in their correct windows for side-input access. + */ +public class FlinkMergingNonShuffleReduceFunction< + K, InputT, AccumT, OutputT, W extends IntervalWindow> + extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> { + + private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn; + + private final WindowingStrategy<?, W> windowingStrategy; + + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + private final SerializedPipelineOptions serializedOptions; + + public FlinkMergingNonShuffleReduceFunction( + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + + this.combineFn = keyedCombineFn; + + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + + } + + @Override + public void reduce( + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + PipelineOptions options = serializedOptions.getPipelineOptions(); + + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, InputT>> inputValue : elements) { + for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { + @Override + public int compare( + WindowedValue<KV<K, InputT>> o1, + WindowedValue<KV<K, InputT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // merge windows, we have to do it in an extra pre-processing step and + // can't do it as we go since the window of early elements would not + // be correct when calling the CombineFn + mergeWindow(sortedInput); + + // iterate over the elements that are sorted by window timestamp + final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); + + // create accumulator using the first elements key + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); + InputT firstValue = currentValue.getValue().getValue(); + AccumT accumulator = + combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, + options, sideInputReader, currentValue.getWindows()); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + + while (iterator.hasNext()) { + WindowedValue<KV<K, InputT>> nextValue = iterator.next(); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + + if (currentWindow.equals(nextWindow)) { + // continue accumulating and merge windows + + InputT value = nextValue.getValue().getValue(); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); + + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + + currentWindow = nextWindow; + currentValue = nextValue; + InputT value = nextValue.getValue().getValue(); + accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + } + + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + } + + /** + * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. + * This replaces windows in the input list. + */ + private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { + int currentStart = 0; + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); + + for (int i = 1; i < elements.size(); i++) { + WindowedValue<KV<K, InputT>> nextValue = elements.get(i); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + if (currentWindow.intersects(nextWindow)) { + // we continue + currentWindow = currentWindow.span(nextWindow); + } else { + // retrofit the merged window to all windows up to "currentStart" + for (int j = i - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + currentStart = i; + currentWindow = nextWindow; + } + } + if (currentStart < elements.size() - 1) { + // we have to retrofit the last batch + for (int j = elements.size() - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + } + } + +}
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java new file mode 100644 index 0000000..c68f155 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * Special version of {@link FlinkPartialReduceFunction} that supports merging windows. This + * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the + * same behaviour as {@code MergeOverlappingIntervalWindows}. + */ +public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow> + extends FlinkPartialReduceFunction<K, InputT, AccumT, W> { + + public FlinkMergingPartialReduceFunction( + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + super(combineFn, windowingStrategy, sideInputs, pipelineOptions); + } + + @Override + public void combine( + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { + + PipelineOptions options = serializedOptions.getPipelineOptions(); + + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, InputT>> inputValue : elements) { + for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { + @Override + public int compare( + WindowedValue<KV<K, InputT>> o1, + WindowedValue<KV<K, InputT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // merge windows, we have to do it in an extra pre-processing step and + // can't do it as we go since the window of early elements would not + // be correct when calling the CombineFn + mergeWindow(sortedInput); + + // iterate over the elements that are sorted by window timestamp + final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); + + // create accumulator using the first elements key + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); + InputT firstValue = currentValue.getValue().getValue(); + AccumT accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, + options, sideInputReader, currentValue.getWindows()); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + + while (iterator.hasNext()) { + WindowedValue<KV<K, InputT>> nextValue = iterator.next(); + IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + + if (currentWindow.equals(nextWindow)) { + // continue accumulating and merge windows + + InputT value = nextValue.getValue().getValue(); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); + + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + + currentWindow = nextWindow; + currentValue = nextValue; + InputT value = nextValue.getValue().getValue(); + accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + } + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + } + + /** + * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. + * This replaces windows in the input list. + */ + private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { + int currentStart = 0; + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); + + for (int i = 1; i < elements.size(); i++) { + WindowedValue<KV<K, InputT>> nextValue = elements.get(i); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + if (currentWindow.intersects(nextWindow)) { + // we continue + currentWindow = currentWindow.span(nextWindow); + } else { + // retrofit the merged window to all windows up to "currentStart" + for (int j = i - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + currentStart = i; + currentWindow = nextWindow; + } + } + if (currentStart < elements.size() - 1) { + // we have to retrofit the last batch + for (int j = elements.size() - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java new file mode 100644 index 0000000..84b3adc --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * Special version of {@link FlinkReduceFunction} that supports merging windows. This + * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the + * same behaviour as {@code MergeOverlappingIntervalWindows}. + */ +public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow> + extends FlinkReduceFunction<K, AccumT, OutputT, W> { + + public FlinkMergingReduceFunction( + CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions); + } + + @Override + public void reduce( + Iterable<WindowedValue<KV<K, AccumT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + PipelineOptions options = serializedOptions.getPipelineOptions(); + + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, AccumT>> inputValue : elements) { + for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { + @Override + public int compare( + WindowedValue<KV<K, AccumT>> o1, + WindowedValue<KV<K, AccumT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // merge windows, we have to do it in an extra pre-processing step and + // can't do it as we go since the window of early elements would not + // be correct when calling the CombineFn + mergeWindow(sortedInput); + + // iterate over the elements that are sorted by window timestamp + final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); + + // get the first accumulator + WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); + AccumT accumulator = currentValue.getValue().getValue(); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn, + // in FlinkPartialReduceFunction we already merge the timestamps assigned + // to individual elements, here we just merge them + List<Instant> windowTimestamps = new ArrayList<>(); + windowTimestamps.add(currentValue.getTimestamp()); + + while (iterator.hasNext()) { + WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + + if (nextWindow.equals(currentWindow)) { + // continue accumulating and merge windows + + accumulator = combineFnRunner.mergeAccumulators( + key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), + options, sideInputReader, currentValue.getWindows()); + + windowTimestamps.add(nextValue.getTimestamp()); + } else { + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); + + windowTimestamps.clear(); + + currentWindow = nextWindow; + currentValue = nextValue; + accumulator = nextValue.getValue().getValue(); + windowTimestamps.add(nextValue.getTimestamp()); + } + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); + } + + /** + * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. + * This replaces windows in the input list. + */ + private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) { + int currentStart = 0; + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); + + for (int i = 1; i < elements.size(); i++) { + WindowedValue<KV<K, AccumT>> nextValue = elements.get(i); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + if (currentWindow.intersects(nextWindow)) { + // we continue + currentWindow = currentWindow.span(nextWindow); + } else { + // retrofit the merged window to all windows up to "currentStart" + for (int j = i - 1; j >= currentStart; j--) { + WindowedValue<KV<K, AccumT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + currentStart = i; + currentWindow = nextWindow; + } + } + if (currentStart < elements.size() - 1) { + // we have to retrofit the last batch + for (int j = elements.size() - 1; j >= currentStart; j--) { + WindowedValue<KV<K, AccumT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java new file mode 100644 index 0000000..9071cc5 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.sdk.transforms.join.RawUnionValue; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +/** + * A {@link FlatMapFunction} function that filters out those elements that don't belong in this + * output. We need this to implement MultiOutput ParDo functions in combination with + * {@link FlinkDoFnFunction}. + */ +public class FlinkMultiOutputPruningFunction<T> + implements FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<T>> { + + private final int ourOutputTag; + + public FlinkMultiOutputPruningFunction(int ourOutputTag) { + this.ourOutputTag = ourOutputTag; + } + + @Override + @SuppressWarnings("unchecked") + public void flatMap( + WindowedValue<RawUnionValue> windowedValue, + Collector<WindowedValue<T>> collector) throws Exception { + int unionTag = windowedValue.getValue().getUnionTag(); + if (unionTag == ourOutputTag) { + collector.collect( + (WindowedValue<T>) windowedValue.withValue(windowedValue.getValue().getValue())); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java new file mode 100644 index 0000000..847a00a --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import java.io.IOException; +import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * A {@link StepContext} for Flink Batch Runner execution. + */ +public class FlinkNoOpStepContext implements StepContext { + + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue<?> output) { + + } + + @Override + public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) { + + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, + Coder<W> windowCoder) throws IOException { + } + + @Override + public StateInternals<?> stateInternals() { + return null; + } + + @Override + public TimerInternals timerInternals() { + return null; + } +} + http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java new file mode 100644 index 0000000..1d1ff9f --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.flink.api.common.functions.RichGroupCombineFunction; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * This is is the first step for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} + * on Flink. The second part is {@link FlinkReduceFunction}. This function performs a local + * combine step before shuffling while the latter does the final combination after a shuffle. + * + * <p>The input to {@link #combine(Iterable, Collector)} are elements of the same key but + * for different windows. We have to ensure that we only combine elements of matching + * windows. + */ +public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow> + extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> { + + protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn; + + protected final WindowingStrategy<?, W> windowingStrategy; + + protected final SerializedPipelineOptions serializedOptions; + + protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + public FlinkPartialReduceFunction( + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + + this.combineFn = combineFn; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + + } + + @Override + public void combine( + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { + + PipelineOptions options = serializedOptions.getPipelineOptions(); + + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, InputT>> inputValue : elements) { + for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { + @Override + public int compare( + WindowedValue<KV<K, InputT>> o1, + WindowedValue<KV<K, InputT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // iterate over the elements that are sorted by window timestamp + // + final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); + + // create accumulator using the first elements key + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); + InputT firstValue = currentValue.getValue().getValue(); + AccumT accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, + options, sideInputReader, currentValue.getWindows()); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + + while (iterator.hasNext()) { + WindowedValue<KV<K, InputT>> nextValue = iterator.next(); + BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); + + if (nextWindow.equals(currentWindow)) { + // continue accumulating + InputT value = nextValue.getValue().getValue(); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); + + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + + currentWindow = nextWindow; + currentValue = nextValue; + InputT value = nextValue.getValue().getValue(); + accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + } + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, accumulator), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java new file mode 100644 index 0000000..3e4f742 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} + * on Flink, the second part is {@link FlinkReduceFunction}. This function performs the final + * combination of the pre-combined values after a shuffle. + * + * <p>The input to {@link #reduce(Iterable, Collector)} are elements of the same key but + * for different windows. We have to ensure that we only combine elements of matching + * windows. + */ +public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> + extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> { + + protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn; + + protected final WindowingStrategy<?, W> windowingStrategy; + + protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + + protected final SerializedPipelineOptions serializedOptions; + + public FlinkReduceFunction( + CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, + WindowingStrategy<?, W> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions) { + + this.combineFn = keyedCombineFn; + + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + + } + + @Override + public void reduce( + Iterable<WindowedValue<KV<K, AccumT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + PipelineOptions options = serializedOptions.getPipelineOptions(); + + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, AccumT>> inputValue: elements) { + for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { + @Override + public int compare( + WindowedValue<KV<K, AccumT>> o1, + WindowedValue<KV<K, AccumT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + // iterate over the elements that are sorted by window timestamp + // + final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); + + // get the first accumulator + WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); + AccumT accumulator = currentValue.getValue().getValue(); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn, + // in FlinkPartialReduceFunction we already merge the timestamps assigned + // to individual elements, here we just merge them + List<Instant> windowTimestamps = new ArrayList<>(); + windowTimestamps.add(currentValue.getTimestamp()); + + while (iterator.hasNext()) { + WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); + BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); + + if (nextWindow.equals(currentWindow)) { + // continue accumulating + accumulator = combineFnRunner.mergeAccumulators( + key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), + options, sideInputReader, currentValue.getWindows()); + + windowTimestamps.add(nextValue.getTimestamp()); + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); + + windowTimestamps.clear(); + + currentWindow = nextWindow; + currentValue = nextValue; + accumulator = nextValue.getValue().getValue(); + windowTimestamps.add(nextValue.getTimestamp()); + } + + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + outputTimeFn.merge(currentWindow, windowTimestamps), + currentWindow, + PaneInfo.NO_FIRING)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java new file mode 100644 index 0000000..c317182 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * A {@link SideInputReader} for the Flink Batch Runner. + */ +public class FlinkSideInputReader implements SideInputReader { + + private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs; + + private RuntimeContext runtimeContext; + + public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView, + RuntimeContext runtimeContext) { + sideInputs = new HashMap<>(); + for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) { + sideInputs.put(entry.getKey().getTagInternal(), entry.getValue()); + } + this.runtimeContext = runtimeContext; + } + + @Nullable + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + checkNotNull(view, "View passed to sideInput cannot be null"); + TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal(); + checkNotNull( + sideInputs.get(tag), + "Side input for " + view + " not available."); + + Map<BoundedWindow, T> sideInputs = + runtimeContext.getBroadcastVariableWithInitializer( + tag.getId(), new SideInputInitializer<>(view)); + T result = sideInputs.get(window); + if (result == null) { + result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); + } + return result; + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + return sideInputs.containsKey(view.getTagInternal()); + } + + @Override + public boolean isEmpty() { + return sideInputs.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java new file mode 100644 index 0000000..c8193d2 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import static org.apache.flink.util.Preconditions.checkArgument; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.InMemoryStateInternals; +import org.apache.beam.runners.core.InMemoryTimerInternals; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.StateNamespace; +import org.apache.beam.runners.core.StateNamespaces; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink Batch Runner. + */ +public class FlinkStatefulDoFnFunction<K, V, OutputT> + extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> { + + private final DoFn<KV<K, V>, OutputT> dofn; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; + private final SerializedPipelineOptions serializedOptions; + private final Map<TupleTag<?>, Integer> outputMap; + private final TupleTag<OutputT> mainOutputTag; + private transient DoFnInvoker doFnInvoker; + + public FlinkStatefulDoFnFunction( + DoFn<KV<K, V>, OutputT> dofn, + WindowingStrategy<?, ?> windowingStrategy, + Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, + PipelineOptions pipelineOptions, + Map<TupleTag<?>, Integer> outputMap, + TupleTag<OutputT> mainOutputTag) { + + this.dofn = dofn; + this.windowingStrategy = windowingStrategy; + this.sideInputs = sideInputs; + this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); + this.outputMap = outputMap; + this.mainOutputTag = mainOutputTag; + } + + @Override + public void reduce( + Iterable<WindowedValue<KV<K, V>>> values, + Collector<WindowedValue<OutputT>> out) throws Exception { + RuntimeContext runtimeContext = getRuntimeContext(); + + DoFnRunners.OutputManager outputManager; + if (outputMap == null) { + outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); + } else { + // it has some additional Outputs + outputManager = + new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); + } + + final Iterator<WindowedValue<KV<K, V>>> iterator = values.iterator(); + + // get the first value, we need this for initializing the state internals with the key. + // we are guaranteed to have a first value, otherwise reduce() would not have been called. + WindowedValue<KV<K, V>> currentValue = iterator.next(); + final K key = currentValue.getValue().getKey(); + + final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key); + + // Used with Batch, we know that all the data is available for this key. We can't use the + // timer manager from the context because it doesn't exist. So we create one and advance + // time to the end after processing all elements. + final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + timerInternals.advanceProcessingTime(Instant.now()); + timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + + DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner( + serializedOptions.getPipelineOptions(), dofn, + new FlinkSideInputReader(sideInputs, runtimeContext), + outputManager, + mainOutputTag, + // see SimpleDoFnRunner, just use it to limit number of additional outputs + Collections.<TupleTag<?>>emptyList(), + new FlinkNoOpStepContext() { + @Override + public StateInternals<?> stateInternals() { + return stateInternals; + } + @Override + public TimerInternals timerInternals() { + return timerInternals; + } + }, + new FlinkAggregatorFactory(runtimeContext), + windowingStrategy); + + doFnRunner.startBundle(); + + doFnRunner.processElement(currentValue); + while (iterator.hasNext()) { + currentValue = iterator.next(); + doFnRunner.processElement(currentValue); + } + + // Finish any pending windows by advancing the input watermark to infinity. + timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); + + // Finally, advance the processing time to infinity to fire any timers. + timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); + + fireEligibleTimers(timerInternals, doFnRunner); + + doFnRunner.finishBundle(); + } + + private void fireEligibleTimers( + InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner) + throws Exception { + + while (true) { + + TimerInternals.TimerData timer; + boolean hasFired = false; + + while ((timer = timerInternals.removeNextEventTimer()) != null) { + hasFired = true; + fireTimer(timer, runner); + } + while ((timer = timerInternals.removeNextProcessingTimer()) != null) { + hasFired = true; + fireTimer(timer, runner); + } + while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { + hasFired = true; + fireTimer(timer, runner); + } + if (!hasFired) { + break; + } + } + } + + private void fireTimer( + TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner) { + StateNamespace namespace = timer.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); + doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain()); + } + + @Override + public void open(Configuration parameters) throws Exception { + doFnInvoker = DoFnInvokers.invokerFor(dofn); + doFnInvoker.invokeSetup(); + } + + @Override + public void close() throws Exception { + doFnInvoker.invokeTeardown(); + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java new file mode 100644 index 0000000..12222b4 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.flink.api.common.functions.BroadcastVariableInitializer; + +/** + * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map} + * from window to side input. + */ +public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow> + implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> { + + PCollectionView<ViewT> view; + + public SideInputInitializer(PCollectionView<ViewT> view) { + this.view = view; + } + + @Override + public Map<BoundedWindow, ViewT> initializeBroadcastVariable( + Iterable<WindowedValue<ElemT>> inputValues) { + + // first partition into windows + Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>(); + for (WindowedValue<ElemT> value: inputValues) { + for (BoundedWindow window: value.getWindows()) { + List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window); + if (windowedValues == null) { + windowedValues = new ArrayList<>(); + partitionedElements.put(window, windowedValues); + } + windowedValues.add(value); + } + } + + Map<BoundedWindow, ViewT> resultMap = new HashMap<>(); + + for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements: + partitionedElements.entrySet()) { + + @SuppressWarnings("unchecked") + Iterable<WindowedValue<?>> elementsIterable = + (List<WindowedValue<?>>) (List<?>) elements.getValue(); + + resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable)); + } + + return resultMap; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java new file mode 100644 index 0000000..9f11212 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.functions; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java new file mode 100644 index 0000000..af4b354 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java new file mode 100644 index 0000000..9b449aa --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.beam.sdk.coders.Coder; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.AtomicType; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * Flink {@link org.apache.flink.api.common.typeinfo.TypeInformation} for + * Dataflow {@link org.apache.beam.sdk.coders.Coder}s. + */ +public class CoderTypeInformation<T> extends TypeInformation<T> implements AtomicType<T> { + + private final Coder<T> coder; + + public CoderTypeInformation(Coder<T> coder) { + checkNotNull(coder); + this.coder = coder; + } + + public Coder<T> getCoder() { + return coder; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + @SuppressWarnings("unchecked") + public Class<T> getTypeClass() { + // We don't have the Class, so we have to pass null here. What a shame... + return (Class<T>) Object.class; + } + + @Override + public boolean isKeyType() { + return true; + } + + @Override + @SuppressWarnings("unchecked") + public TypeSerializer<T> createSerializer(ExecutionConfig config) { + return new CoderTypeSerializer<>(coder); + } + + @Override + public int getTotalFields() { + return 2; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CoderTypeInformation that = (CoderTypeInformation) o; + + return coder.equals(that.coder); + + } + + @Override + public int hashCode() { + return coder.hashCode(); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof CoderTypeInformation; + } + + @Override + public String toString() { + return "CoderTypeInformation{coder=" + coder + '}'; + } + + @Override + public TypeComparator<T> createComparator(boolean sortOrderAscending, ExecutionConfig + executionConfig) { + throw new UnsupportedOperationException( + "Non-encoded values cannot be compared directly."); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java new file mode 100644 index 0000000..e210ed9 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import java.io.EOFException; +import java.io.IOException; +import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; +import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.util.CoderUtils; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * Flink {@link org.apache.flink.api.common.typeutils.TypeSerializer} for + * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}. + */ +public class CoderTypeSerializer<T> extends TypeSerializer<T> { + + private Coder<T> coder; + + public CoderTypeSerializer(Coder<T> coder) { + this.coder = coder; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public CoderTypeSerializer<T> duplicate() { + return new CoderTypeSerializer<>(coder); + } + + @Override + public T createInstance() { + return null; + } + + @Override + public T copy(T t) { + try { + return CoderUtils.clone(coder, t); + } catch (CoderException e) { + throw new RuntimeException("Could not clone.", e); + } + } + + @Override + public T copy(T t, T reuse) { + return copy(t); + } + + @Override + public int getLength() { + return -1; + } + + @Override + public void serialize(T t, DataOutputView dataOutputView) throws IOException { + DataOutputViewWrapper outputWrapper = new DataOutputViewWrapper(dataOutputView); + coder.encode(t, outputWrapper, Coder.Context.NESTED); + } + + @Override + public T deserialize(DataInputView dataInputView) throws IOException { + try { + DataInputViewWrapper inputWrapper = new DataInputViewWrapper(dataInputView); + return coder.decode(inputWrapper, Coder.Context.NESTED); + } catch (CoderException e) { + Throwable cause = e.getCause(); + if (cause instanceof EOFException) { + throw (EOFException) cause; + } else { + throw e; + } + } + } + + @Override + public T deserialize(T t, DataInputView dataInputView) throws IOException { + return deserialize(dataInputView); + } + + @Override + public void copy( + DataInputView dataInputView, + DataOutputView dataOutputView) throws IOException { + serialize(deserialize(dataInputView), dataOutputView); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CoderTypeSerializer that = (CoderTypeSerializer) o; + return coder.equals(that.coder); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof CoderTypeSerializer; + } + + @Override + public int hashCode() { + return coder.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java new file mode 100644 index 0000000..667ef45 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueComparator.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.beam.sdk.coders.Coder; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +/** + * Flink {@link org.apache.flink.api.common.typeutils.TypeComparator} for Beam values that have + * been encoded to byte data by a {@link Coder}. + */ +public class EncodedValueComparator extends TypeComparator<byte[]> { + + /** For storing the Reference in encoded form. */ + private transient byte[] encodedReferenceKey; + + private final boolean ascending; + + public EncodedValueComparator(boolean ascending) { + this.ascending = ascending; + } + + @Override + public int hash(byte[] record) { + return Arrays.hashCode(record); + } + + @Override + public void setReference(byte[] toCompare) { + this.encodedReferenceKey = toCompare; + } + + @Override + public boolean equalToReference(byte[] candidate) { + if (encodedReferenceKey.length != candidate.length) { + return false; + } + int len = candidate.length; + for (int i = 0; i < len; i++) { + if (encodedReferenceKey[i] != candidate[i]) { + return false; + } + } + return true; + } + + @Override + public int compareToReference(TypeComparator<byte[]> other) { + // VERY IMPORTANT: compareToReference does not behave like Comparable.compare + // the meaning of the return value is inverted. + + EncodedValueComparator otherEncodedValueComparator = (EncodedValueComparator) other; + + int len = Math.min( + encodedReferenceKey.length, + otherEncodedValueComparator.encodedReferenceKey.length); + + for (int i = 0; i < len; i++) { + byte b1 = encodedReferenceKey[i]; + byte b2 = otherEncodedValueComparator.encodedReferenceKey[i]; + int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1)); + if (result != 0) { + return ascending ? -result : result; + } + } + int result = + encodedReferenceKey.length - otherEncodedValueComparator.encodedReferenceKey.length; + return ascending ? -result : result; + } + + + @Override + public int compare(byte[] first, byte[] second) { + int len = Math.min(first.length, second.length); + for (int i = 0; i < len; i++) { + byte b1 = first[i]; + byte b2 = second[i]; + int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1)); + if (result != 0) { + return ascending ? result : -result; + } + } + int result = first.length - second.length; + return ascending ? result : -result; + } + + @Override + public int compareSerialized( + DataInputView firstSource, + DataInputView secondSource) throws IOException { + int lengthFirst = firstSource.readInt(); + int lengthSecond = secondSource.readInt(); + + int len = Math.min(lengthFirst, lengthSecond); + for (int i = 0; i < len; i++) { + byte b1 = firstSource.readByte(); + byte b2 = secondSource.readByte(); + int result = (b1 < b2 ? -1 : (b1 == b2 ? 0 : 1)); + if (result != 0) { + return ascending ? result : -result; + } + } + + int result = lengthFirst - lengthSecond; + return ascending ? result : -result; + } + + + + @Override + public boolean supportsNormalizedKey() { + // disabled because this seems to not work with some coders, + // such as the AvroCoder + return false; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return Integer.MAX_VALUE; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return true; + } + + @Override + public void putNormalizedKey(byte[] record, MemorySegment target, int offset, int numBytes) { + final int limit = offset + numBytes; + + target.put(offset, record, 0, Math.min(numBytes, record.length)); + + offset += record.length; + + while (offset < limit) { + target.put(offset++, (byte) 0); + } + } + + @Override + public void writeWithKeyNormalization(byte[] record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public byte[] readWithKeyDenormalization(byte[] reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return !ascending; + } + + @Override + public TypeComparator<byte[]> duplicate() { + return new EncodedValueComparator(ascending); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return new TypeComparator[] { this.duplicate() }; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java new file mode 100644 index 0000000..41db61e --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.types; + +import java.io.IOException; + +import org.apache.beam.sdk.coders.Coder; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +/** + * {@link TypeSerializer} for values that were encoded using a {@link Coder}. + */ +public final class EncodedValueSerializer extends TypeSerializer<byte[]> { + + private static final long serialVersionUID = 1L; + + private static final byte[] EMPTY = new byte[0]; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public byte[] createInstance() { + return EMPTY; + } + + @Override + public byte[] copy(byte[] from) { + return from; + } + + @Override + public byte[] copy(byte[] from, byte[] reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + + @Override + public void serialize(byte[] record, DataOutputView target) throws IOException { + if (record == null) { + throw new IllegalArgumentException("The record must not be null."); + } + + final int len = record.length; + target.writeInt(len); + target.write(record); + } + + @Override + public byte[] deserialize(DataInputView source) throws IOException { + final int len = source.readInt(); + byte[] result = new byte[len]; + source.readFully(result); + return result; + } + + @Override + public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final int len = source.readInt(); + target.writeInt(len); + target.write(source, len); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EncodedValueSerializer; + } + + @Override + public int hashCode() { + return this.getClass().hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof EncodedValueSerializer; + } + + @Override + public TypeSerializer<byte[]> duplicate() { + return this; + } +}
