http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java deleted file mode 100644 index 8f50105..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.Pipeline.PipelineExecutionException; -import org.apache.beam.sdk.PipelineResult; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.PipelineOptionsValidator; -import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.util.UserCodeException; - -/** - * Test Flink runner. - */ -public class TestFlinkRunner extends PipelineRunner<PipelineResult> { - - private FlinkRunner delegate; - - private TestFlinkRunner(FlinkPipelineOptions options) { - // We use [auto] for testing since this will make it pick up the Testing ExecutionEnvironment - options.setFlinkMaster("[auto]"); - this.delegate = FlinkRunner.fromOptions(options); - } - - public static TestFlinkRunner fromOptions(PipelineOptions options) { - FlinkPipelineOptions flinkOptions = - PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); - return new TestFlinkRunner(flinkOptions); - } - - public static TestFlinkRunner create(boolean streaming) { - FlinkPipelineOptions flinkOptions = PipelineOptionsFactory.as(FlinkPipelineOptions.class); - flinkOptions.setRunner(TestFlinkRunner.class); - flinkOptions.setStreaming(streaming); - return TestFlinkRunner.fromOptions(flinkOptions); - } - - @Override - public PipelineResult run(Pipeline pipeline) { - try { - return delegate.run(pipeline); - } catch (Throwable t) { - // Special case hack to pull out assertion errors from PAssert; instead there should - // probably be a better story along the lines of UserCodeException. - UserCodeException innermostUserCodeException = null; - Throwable current = t; - for (; current.getCause() != null; current = current.getCause()) { - if (current instanceof UserCodeException) { - innermostUserCodeException = ((UserCodeException) current); - } - } - if (innermostUserCodeException != null) { - current = innermostUserCodeException.getCause(); - } - if (current instanceof AssertionError) { - throw (AssertionError) current; - } - throw new PipelineExecutionException(current); - } - } - - public PipelineOptions getPipelineOptions() { - return delegate.getPipelineOptions(); - } -} - -
http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java deleted file mode 100644 index ad54750..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TranslationMode.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -/** - * The translation mode of the Beam Pipeline. - */ -enum TranslationMode { - - /** Uses the batch mode of Flink. */ - BATCH, - - /** Uses the streaming mode of Flink. */ - STREAMING - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java deleted file mode 100644 index 57f1e59..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink; http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java deleted file mode 100644 index fb2493b..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.runners.core.AggregatorFactory; -import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.flink.api.common.functions.RuntimeContext; - -/** - * A {@link AggregatorFactory} for the Flink Batch Runner. - */ -public class FlinkAggregatorFactory implements AggregatorFactory{ - - private final RuntimeContext runtimeContext; - - public FlinkAggregatorFactory(RuntimeContext runtimeContext) { - this.runtimeContext = runtimeContext; - } - - @Override - public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName, - Combine.CombineFn<InputT, AccumT, OutputT> combine) { - @SuppressWarnings("unchecked") - SerializableFnAggregatorWrapper<InputT, OutputT> result = - (SerializableFnAggregatorWrapper<InputT, OutputT>) - runtimeContext.getAccumulator(aggregatorName); - - if (result == null) { - result = new SerializableFnAggregatorWrapper<>(combine); - runtimeContext.addAccumulator(aggregatorName, result); - } - return result; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java deleted file mode 100644 index 447b1e5..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignContext.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.collect.Iterables; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.joda.time.Instant; - -/** - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn.AssignContext} for - * Flink functions. - */ -class FlinkAssignContext<InputT, W extends BoundedWindow> - extends WindowFn<InputT, W>.AssignContext { - private final WindowedValue<InputT> value; - - FlinkAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { - fn.super(); - checkArgument( - Iterables.size(value.getWindows()) == 1, - String.format( - "%s passed to window assignment must be in a single window, but it was in %s: %s", - WindowedValue.class.getSimpleName(), - Iterables.size(value.getWindows()), - value.getWindows())); - this.value = value; - } - - @Override - public InputT element() { - return value.getValue(); - } - - @Override - public Instant timestamp() { - return value.getTimestamp(); - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(value.getWindows()); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java deleted file mode 100644 index c3a5095..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAssignWindows.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import java.util.Collection; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.util.Collector; - -/** - * Flink {@link FlatMapFunction} for implementing - * {@link org.apache.beam.sdk.transforms.windowing.Window.Assign}. - */ -public class FlinkAssignWindows<T, W extends BoundedWindow> - implements FlatMapFunction<WindowedValue<T>, WindowedValue<T>> { - - private final WindowFn<T, W> windowFn; - - public FlinkAssignWindows(WindowFn<T, W> windowFn) { - this.windowFn = windowFn; - } - - @Override - public void flatMap( - WindowedValue<T> input, Collector<WindowedValue<T>> collector) throws Exception { - Collection<W> windows = windowFn.assignWindows(new FlinkAssignContext<>(windowFn, input)); - for (W window: windows) { - collector.collect( - WindowedValue.of(input.getValue(), input.getTimestamp(), window, input.getPane())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java deleted file mode 100644 index 51582af..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import java.util.Collections; -import java.util.Map; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.RichMapPartitionFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; - -/** - * Encapsulates a {@link DoFn} - * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. - * - * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index - * and must tag all outputs with the output number. Afterwards a filter will filter out - * those elements that are not to be in a specific output. - */ -public class FlinkDoFnFunction<InputT, OutputT> - extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> { - - private final SerializedPipelineOptions serializedOptions; - - private final DoFn<InputT, OutputT> doFn; - private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - - private final WindowingStrategy<?, ?> windowingStrategy; - - private final Map<TupleTag<?>, Integer> outputMap; - private final TupleTag<OutputT> mainOutputTag; - - private transient DoFnInvoker<InputT, OutputT> doFnInvoker; - - public FlinkDoFnFunction( - DoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ?> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions options, - Map<TupleTag<?>, Integer> outputMap, - TupleTag<OutputT> mainOutputTag) { - - this.doFn = doFn; - this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(options); - this.windowingStrategy = windowingStrategy; - this.outputMap = outputMap; - this.mainOutputTag = mainOutputTag; - - } - - @Override - public void mapPartition( - Iterable<WindowedValue<InputT>> values, - Collector<WindowedValue<OutputT>> out) throws Exception { - - RuntimeContext runtimeContext = getRuntimeContext(); - - DoFnRunners.OutputManager outputManager; - if (outputMap == null) { - outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); - } else { - // it has some additional outputs - outputManager = - new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); - } - - DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( - serializedOptions.getPipelineOptions(), doFn, - new FlinkSideInputReader(sideInputs, runtimeContext), - outputManager, - mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of additional outputs - Collections.<TupleTag<?>>emptyList(), - new FlinkNoOpStepContext(), - new FlinkAggregatorFactory(runtimeContext), - windowingStrategy); - - doFnRunner.startBundle(); - - for (WindowedValue<InputT> value : values) { - doFnRunner.processElement(value); - } - - doFnRunner.finishBundle(); - } - - @Override - public void open(Configuration parameters) throws Exception { - doFnInvoker = DoFnInvokers.invokerFor(doFn); - doFnInvoker.invokeSetup(); - } - - @Override - public void close() throws Exception { - doFnInvoker.invokeTeardown(); - } - - static class DoFnOutputManager - implements DoFnRunners.OutputManager { - - private Collector collector; - - DoFnOutputManager(Collector collector) { - this.collector = collector; - } - - @Override - @SuppressWarnings("unchecked") - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - collector.collect(output); - } - } - - static class MultiDoFnOutputManager - implements DoFnRunners.OutputManager { - - private Collector<WindowedValue<RawUnionValue>> collector; - private Map<TupleTag<?>, Integer> outputMap; - - MultiDoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector, - Map<TupleTag<?>, Integer> outputMap) { - this.collector = collector; - this.outputMap = outputMap; - } - - @Override - public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { - collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), output.getValue()), - output.getTimestamp(), output.getWindows(), output.getPane())); - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java deleted file mode 100644 index 26fd0b4..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * Special version of {@link FlinkReduceFunction} that supports merging windows. This - * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the - * same behaviour as {@code MergeOverlappingIntervalWindows}. - * - * <p>This is different from the pair of function for the non-merging windows case - * in that we cannot do combining before the shuffle because elements would not - * yet be in their correct windows for side-input access. - */ -public class FlinkMergingNonShuffleReduceFunction< - K, InputT, AccumT, OutputT, W extends IntervalWindow> - extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> { - - private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn; - - private final WindowingStrategy<?, W> windowingStrategy; - - private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - - private final SerializedPipelineOptions serializedOptions; - - public FlinkMergingNonShuffleReduceFunction( - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn, - WindowingStrategy<?, W> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions) { - - this.combineFn = keyedCombineFn; - - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - - } - - @Override - public void reduce( - Iterable<WindowedValue<KV<K, InputT>>> elements, - Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - - PipelineOptions options = serializedOptions.getPipelineOptions(); - - FlinkSideInputReader sideInputReader = - new FlinkSideInputReader(sideInputs, getRuntimeContext()); - - PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue : elements) { - for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { - @Override - public int compare( - WindowedValue<KV<K, InputT>> o1, - WindowedValue<KV<K, InputT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // merge windows, we have to do it in an extra pre-processing step and - // can't do it as we go since the window of early elements would not - // be correct when calling the CombineFn - mergeWindow(sortedInput); - - // iterate over the elements that are sorted by window timestamp - final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); - - // create accumulator using the first elements key - WindowedValue<KV<K, InputT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); - InputT firstValue = currentValue.getValue().getValue(); - AccumT accumulator = - combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, - options, sideInputReader, currentValue.getWindows()); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn - Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); - - while (iterator.hasNext()) { - WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - - if (currentWindow.equals(nextWindow)) { - // continue accumulating and merge windows - - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - - windowTimestamp = outputTimeFn.combine( - windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); - - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - - currentWindow = nextWindow; - currentValue = nextValue; - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); - } - - } - - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - } - - /** - * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. - * This replaces windows in the input list. - */ - private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { - int currentStart = 0; - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); - - for (int i = 1; i < elements.size(); i++) { - WindowedValue<KV<K, InputT>> nextValue = elements.get(i); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - if (currentWindow.intersects(nextWindow)) { - // we continue - currentWindow = currentWindow.span(nextWindow); - } else { - // retrofit the merged window to all windows up to "currentStart" - for (int j = i - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - currentStart = i; - currentWindow = nextWindow; - } - } - if (currentStart < elements.size() - 1) { - // we have to retrofit the last batch - for (int j = elements.size() - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java deleted file mode 100644 index c68f155..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * Special version of {@link FlinkPartialReduceFunction} that supports merging windows. This - * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the - * same behaviour as {@code MergeOverlappingIntervalWindows}. - */ -public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow> - extends FlinkPartialReduceFunction<K, InputT, AccumT, W> { - - public FlinkMergingPartialReduceFunction( - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, - WindowingStrategy<?, W> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions) { - super(combineFn, windowingStrategy, sideInputs, pipelineOptions); - } - - @Override - public void combine( - Iterable<WindowedValue<KV<K, InputT>>> elements, - Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - - PipelineOptions options = serializedOptions.getPipelineOptions(); - - FlinkSideInputReader sideInputReader = - new FlinkSideInputReader(sideInputs, getRuntimeContext()); - - PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue : elements) { - for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { - @Override - public int compare( - WindowedValue<KV<K, InputT>> o1, - WindowedValue<KV<K, InputT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // merge windows, we have to do it in an extra pre-processing step and - // can't do it as we go since the window of early elements would not - // be correct when calling the CombineFn - mergeWindow(sortedInput); - - // iterate over the elements that are sorted by window timestamp - final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); - - // create accumulator using the first elements key - WindowedValue<KV<K, InputT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); - InputT firstValue = currentValue.getValue().getValue(); - AccumT accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, - options, sideInputReader, currentValue.getWindows()); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn - Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); - - while (iterator.hasNext()) { - WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - - if (currentWindow.equals(nextWindow)) { - // continue accumulating and merge windows - - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - - windowTimestamp = outputTimeFn.combine( - windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); - - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - - currentWindow = nextWindow; - currentValue = nextValue; - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); - } - } - - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - } - - /** - * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. - * This replaces windows in the input list. - */ - private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { - int currentStart = 0; - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); - - for (int i = 1; i < elements.size(); i++) { - WindowedValue<KV<K, InputT>> nextValue = elements.get(i); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - if (currentWindow.intersects(nextWindow)) { - // we continue - currentWindow = currentWindow.span(nextWindow); - } else { - // retrofit the merged window to all windows up to "currentStart" - for (int j = i - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - currentStart = i; - currentWindow = nextWindow; - } - } - if (currentStart < elements.size() - 1) { - // we have to retrofit the last batch - for (int j = elements.size() - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java deleted file mode 100644 index 84b3adc..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * Special version of {@link FlinkReduceFunction} that supports merging windows. This - * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the - * same behaviour as {@code MergeOverlappingIntervalWindows}. - */ -public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow> - extends FlinkReduceFunction<K, AccumT, OutputT, W> { - - public FlinkMergingReduceFunction( - CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, - WindowingStrategy<?, W> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions) { - super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions); - } - - @Override - public void reduce( - Iterable<WindowedValue<KV<K, AccumT>>> elements, - Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - - PipelineOptions options = serializedOptions.getPipelineOptions(); - - FlinkSideInputReader sideInputReader = - new FlinkSideInputReader(sideInputs, getRuntimeContext()); - - PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, AccumT>> inputValue : elements) { - for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { - @Override - public int compare( - WindowedValue<KV<K, AccumT>> o1, - WindowedValue<KV<K, AccumT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // merge windows, we have to do it in an extra pre-processing step and - // can't do it as we go since the window of early elements would not - // be correct when calling the CombineFn - mergeWindow(sortedInput); - - // iterate over the elements that are sorted by window timestamp - final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); - - // get the first accumulator - WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); - AccumT accumulator = currentValue.getValue().getValue(); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn, - // in FlinkPartialReduceFunction we already merge the timestamps assigned - // to individual elements, here we just merge them - List<Instant> windowTimestamps = new ArrayList<>(); - windowTimestamps.add(currentValue.getTimestamp()); - - while (iterator.hasNext()) { - WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - - if (nextWindow.equals(currentWindow)) { - // continue accumulating and merge windows - - accumulator = combineFnRunner.mergeAccumulators( - key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), - options, sideInputReader, currentValue.getWindows()); - - windowTimestamps.add(nextValue.getTimestamp()); - } else { - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); - - windowTimestamps.clear(); - - currentWindow = nextWindow; - currentValue = nextValue; - accumulator = nextValue.getValue().getValue(); - windowTimestamps.add(nextValue.getTimestamp()); - } - } - - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); - } - - /** - * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. - * This replaces windows in the input list. - */ - private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) { - int currentStart = 0; - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); - - for (int i = 1; i < elements.size(); i++) { - WindowedValue<KV<K, AccumT>> nextValue = elements.get(i); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - if (currentWindow.intersects(nextWindow)) { - // we continue - currentWindow = currentWindow.span(nextWindow); - } else { - // retrofit the merged window to all windows up to "currentStart" - for (int j = i - 1; j >= currentStart; j--) { - WindowedValue<KV<K, AccumT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - currentStart = i; - currentWindow = nextWindow; - } - } - if (currentStart < elements.size() - 1) { - // we have to retrofit the last batch - for (int j = elements.size() - 1; j >= currentStart; j--) { - WindowedValue<KV<K, AccumT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java deleted file mode 100644 index 9071cc5..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.util.Collector; - -/** - * A {@link FlatMapFunction} function that filters out those elements that don't belong in this - * output. We need this to implement MultiOutput ParDo functions in combination with - * {@link FlinkDoFnFunction}. - */ -public class FlinkMultiOutputPruningFunction<T> - implements FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<T>> { - - private final int ourOutputTag; - - public FlinkMultiOutputPruningFunction(int ourOutputTag) { - this.ourOutputTag = ourOutputTag; - } - - @Override - @SuppressWarnings("unchecked") - public void flatMap( - WindowedValue<RawUnionValue> windowedValue, - Collector<WindowedValue<T>> collector) throws Exception { - int unionTag = windowedValue.getValue().getUnionTag(); - if (unionTag == ourOutputTag) { - collector.collect( - (WindowedValue<T>) windowedValue.withValue(windowedValue.getValue().getValue())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java deleted file mode 100644 index 847a00a..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import java.io.IOException; -import org.apache.beam.runners.core.ExecutionContext.StepContext; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; - -/** - * A {@link StepContext} for Flink Batch Runner execution. - */ -public class FlinkNoOpStepContext implements StepContext { - - @Override - public String getStepName() { - return null; - } - - @Override - public String getTransformName() { - return null; - } - - @Override - public void noteOutput(WindowedValue<?> output) { - - } - - @Override - public void noteOutput(TupleTag<?> tag, WindowedValue<?> output) { - - } - - @Override - public <T, W extends BoundedWindow> void writePCollectionViewData( - TupleTag<?> tag, - Iterable<WindowedValue<T>> data, - Coder<Iterable<WindowedValue<T>>> dataCoder, - W window, - Coder<W> windowCoder) throws IOException { - } - - @Override - public StateInternals<?> stateInternals() { - return null; - } - - @Override - public TimerInternals timerInternals() { - return null; - } -} - http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java deleted file mode 100644 index 1d1ff9f..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ /dev/null @@ -1,172 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.api.common.functions.RichGroupCombineFunction; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * This is is the first step for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} - * on Flink. The second part is {@link FlinkReduceFunction}. This function performs a local - * combine step before shuffling while the latter does the final combination after a shuffle. - * - * <p>The input to {@link #combine(Iterable, Collector)} are elements of the same key but - * for different windows. We have to ensure that we only combine elements of matching - * windows. - */ -public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWindow> - extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, AccumT>>> { - - protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn; - - protected final WindowingStrategy<?, W> windowingStrategy; - - protected final SerializedPipelineOptions serializedOptions; - - protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - - public FlinkPartialReduceFunction( - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, - WindowingStrategy<?, W> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions) { - - this.combineFn = combineFn; - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - - } - - @Override - public void combine( - Iterable<WindowedValue<KV<K, InputT>>> elements, - Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - - PipelineOptions options = serializedOptions.getPipelineOptions(); - - FlinkSideInputReader sideInputReader = - new FlinkSideInputReader(sideInputs, getRuntimeContext()); - - PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue : elements) { - for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { - @Override - public int compare( - WindowedValue<KV<K, InputT>> o1, - WindowedValue<KV<K, InputT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // iterate over the elements that are sorted by window timestamp - // - final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); - - // create accumulator using the first elements key - WindowedValue<KV<K, InputT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); - InputT firstValue = currentValue.getValue().getValue(); - AccumT accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, - options, sideInputReader, currentValue.getWindows()); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn - Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); - - while (iterator.hasNext()) { - WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); - - if (nextWindow.equals(currentWindow)) { - // continue accumulating - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - - windowTimestamp = outputTimeFn.combine( - windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); - - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - - currentWindow = nextWindow; - currentValue = nextValue; - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); - } - } - - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java deleted file mode 100644 index 3e4f742..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} - * on Flink, the second part is {@link FlinkReduceFunction}. This function performs the final - * combination of the pre-combined values after a shuffle. - * - * <p>The input to {@link #reduce(Iterable, Collector)} are elements of the same key but - * for different windows. We have to ensure that we only combine elements of matching - * windows. - */ -public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> - extends RichGroupReduceFunction<WindowedValue<KV<K, AccumT>>, WindowedValue<KV<K, OutputT>>> { - - protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn; - - protected final WindowingStrategy<?, W> windowingStrategy; - - protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - - protected final SerializedPipelineOptions serializedOptions; - - public FlinkReduceFunction( - CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, - WindowingStrategy<?, W> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions) { - - this.combineFn = keyedCombineFn; - - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - - } - - @Override - public void reduce( - Iterable<WindowedValue<KV<K, AccumT>>> elements, - Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - - PipelineOptions options = serializedOptions.getPipelineOptions(); - - FlinkSideInputReader sideInputReader = - new FlinkSideInputReader(sideInputs, getRuntimeContext()); - - PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, AccumT>> inputValue: elements) { - for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { - @Override - public int compare( - WindowedValue<KV<K, AccumT>> o1, - WindowedValue<KV<K, AccumT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // iterate over the elements that are sorted by window timestamp - // - final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); - - // get the first accumulator - WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); - AccumT accumulator = currentValue.getValue().getValue(); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn, - // in FlinkPartialReduceFunction we already merge the timestamps assigned - // to individual elements, here we just merge them - List<Instant> windowTimestamps = new ArrayList<>(); - windowTimestamps.add(currentValue.getTimestamp()); - - while (iterator.hasNext()) { - WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); - BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); - - if (nextWindow.equals(currentWindow)) { - // continue accumulating - accumulator = combineFnRunner.mergeAccumulators( - key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), - options, sideInputReader, currentValue.getWindows()); - - windowTimestamps.add(nextValue.getTimestamp()); - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); - - windowTimestamps.clear(); - - currentWindow = nextWindow; - currentValue = nextValue; - accumulator = nextValue.getValue().getValue(); - windowTimestamps.add(nextValue.getTimestamp()); - } - - } - - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java deleted file mode 100644 index c317182..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.RuntimeContext; - -/** - * A {@link SideInputReader} for the Flink Batch Runner. - */ -public class FlinkSideInputReader implements SideInputReader { - - private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs; - - private RuntimeContext runtimeContext; - - public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView, - RuntimeContext runtimeContext) { - sideInputs = new HashMap<>(); - for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) { - sideInputs.put(entry.getKey().getTagInternal(), entry.getValue()); - } - this.runtimeContext = runtimeContext; - } - - @Nullable - @Override - public <T> T get(PCollectionView<T> view, BoundedWindow window) { - checkNotNull(view, "View passed to sideInput cannot be null"); - TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal(); - checkNotNull( - sideInputs.get(tag), - "Side input for " + view + " not available."); - - Map<BoundedWindow, T> sideInputs = - runtimeContext.getBroadcastVariableWithInitializer( - tag.getId(), new SideInputInitializer<>(view)); - T result = sideInputs.get(window); - if (result == null) { - result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); - } - return result; - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - return sideInputs.containsKey(view.getTagInternal()); - } - - @Override - public boolean isEmpty() { - return sideInputs.isEmpty(); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java deleted file mode 100644 index c8193d2..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import static org.apache.flink.util.Preconditions.checkArgument; - -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import org.apache.beam.runners.core.DoFnRunner; -import org.apache.beam.runners.core.DoFnRunners; -import org.apache.beam.runners.core.InMemoryStateInternals; -import org.apache.beam.runners.core.InMemoryTimerInternals; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.StateNamespace; -import org.apache.beam.runners.core.StateNamespaces; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; -import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.RichGroupReduceFunction; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * A {@link RichGroupReduceFunction} for stateful {@link ParDo} in Flink Batch Runner. - */ -public class FlinkStatefulDoFnFunction<K, V, OutputT> - extends RichGroupReduceFunction<WindowedValue<KV<K, V>>, WindowedValue<OutputT>> { - - private final DoFn<KV<K, V>, OutputT> dofn; - private final WindowingStrategy<?, ?> windowingStrategy; - private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - private final SerializedPipelineOptions serializedOptions; - private final Map<TupleTag<?>, Integer> outputMap; - private final TupleTag<OutputT> mainOutputTag; - private transient DoFnInvoker doFnInvoker; - - public FlinkStatefulDoFnFunction( - DoFn<KV<K, V>, OutputT> dofn, - WindowingStrategy<?, ?> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions, - Map<TupleTag<?>, Integer> outputMap, - TupleTag<OutputT> mainOutputTag) { - - this.dofn = dofn; - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - this.outputMap = outputMap; - this.mainOutputTag = mainOutputTag; - } - - @Override - public void reduce( - Iterable<WindowedValue<KV<K, V>>> values, - Collector<WindowedValue<OutputT>> out) throws Exception { - RuntimeContext runtimeContext = getRuntimeContext(); - - DoFnRunners.OutputManager outputManager; - if (outputMap == null) { - outputManager = new FlinkDoFnFunction.DoFnOutputManager(out); - } else { - // it has some additional Outputs - outputManager = - new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); - } - - final Iterator<WindowedValue<KV<K, V>>> iterator = values.iterator(); - - // get the first value, we need this for initializing the state internals with the key. - // we are guaranteed to have a first value, otherwise reduce() would not have been called. - WindowedValue<KV<K, V>> currentValue = iterator.next(); - final K key = currentValue.getValue().getKey(); - - final InMemoryStateInternals<K> stateInternals = InMemoryStateInternals.forKey(key); - - // Used with Batch, we know that all the data is available for this key. We can't use the - // timer manager from the context because it doesn't exist. So we create one and advance - // time to the end after processing all elements. - final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); - timerInternals.advanceProcessingTime(Instant.now()); - timerInternals.advanceSynchronizedProcessingTime(Instant.now()); - - DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner( - serializedOptions.getPipelineOptions(), dofn, - new FlinkSideInputReader(sideInputs, runtimeContext), - outputManager, - mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of additional outputs - Collections.<TupleTag<?>>emptyList(), - new FlinkNoOpStepContext() { - @Override - public StateInternals<?> stateInternals() { - return stateInternals; - } - @Override - public TimerInternals timerInternals() { - return timerInternals; - } - }, - new FlinkAggregatorFactory(runtimeContext), - windowingStrategy); - - doFnRunner.startBundle(); - - doFnRunner.processElement(currentValue); - while (iterator.hasNext()) { - currentValue = iterator.next(); - doFnRunner.processElement(currentValue); - } - - // Finish any pending windows by advancing the input watermark to infinity. - timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); - - // Finally, advance the processing time to infinity to fire any timers. - timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); - timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE); - - fireEligibleTimers(timerInternals, doFnRunner); - - doFnRunner.finishBundle(); - } - - private void fireEligibleTimers( - InMemoryTimerInternals timerInternals, DoFnRunner<KV<K, V>, OutputT> runner) - throws Exception { - - while (true) { - - TimerInternals.TimerData timer; - boolean hasFired = false; - - while ((timer = timerInternals.removeNextEventTimer()) != null) { - hasFired = true; - fireTimer(timer, runner); - } - while ((timer = timerInternals.removeNextProcessingTimer()) != null) { - hasFired = true; - fireTimer(timer, runner); - } - while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) { - hasFired = true; - fireTimer(timer, runner); - } - if (!hasFired) { - break; - } - } - } - - private void fireTimer( - TimerInternals.TimerData timer, DoFnRunner<KV<K, V>, OutputT> doFnRunner) { - StateNamespace namespace = timer.getNamespace(); - checkArgument(namespace instanceof StateNamespaces.WindowNamespace); - BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); - doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain()); - } - - @Override - public void open(Configuration parameters) throws Exception { - doFnInvoker = DoFnInvokers.invokerFor(dofn); - doFnInvoker.invokeSetup(); - } - - @Override - public void close() throws Exception { - doFnInvoker.invokeTeardown(); - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java deleted file mode 100644 index 12222b4..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.api.common.functions.BroadcastVariableInitializer; - -/** - * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map} - * from window to side input. - */ -public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow> - implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> { - - PCollectionView<ViewT> view; - - public SideInputInitializer(PCollectionView<ViewT> view) { - this.view = view; - } - - @Override - public Map<BoundedWindow, ViewT> initializeBroadcastVariable( - Iterable<WindowedValue<ElemT>> inputValues) { - - // first partition into windows - Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>(); - for (WindowedValue<ElemT> value: inputValues) { - for (BoundedWindow window: value.getWindows()) { - List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window); - if (windowedValues == null) { - windowedValues = new ArrayList<>(); - partitionedElements.put(window, windowedValues); - } - windowedValues.add(value); - } - } - - Map<BoundedWindow, ViewT> resultMap = new HashMap<>(); - - for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements: - partitionedElements.entrySet()) { - - @SuppressWarnings("unchecked") - Iterable<WindowedValue<?>> elementsIterable = - (List<WindowedValue<?>>) (List<?>) elements.getValue(); - - resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable)); - } - - return resultMap; - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java deleted file mode 100644 index 9f11212..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Internal implementation of the Beam runner for Apache Flink. - */ -package org.apache.beam.runners.flink.translation.functions;
