Repository: beam Updated Branches: refs/heads/master 3082178b3 -> b261d4890
[BEAM-1560] Use provided Function Runners in Flink Batch Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0ae2a385 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0ae2a385 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0ae2a385 Branch: refs/heads/master Commit: 0ae2a3851cbfe1ec2f2c7237954b18c9951c76a3 Parents: 3082178 Author: JingsongLi <[email protected]> Authored: Mon Feb 27 18:08:34 2017 +0800 Committer: Aljoscha Krettek <[email protected]> Committed: Mon Feb 27 13:22:42 2017 +0100 ---------------------------------------------------------------------- .../flink/FlinkBatchTransformTranslators.java | 9 +- .../runners/flink/OldPerKeyCombineFnRunner.java | 62 ----- .../flink/OldPerKeyCombineFnRunners.java | 155 ----------- .../functions/FlinkAggregatorFactory.java | 53 ++++ .../functions/FlinkDoFnFunction.java | 96 +++---- .../FlinkMergingNonShuffleReduceFunction.java | 65 ++--- .../FlinkMergingPartialReduceFunction.java | 45 ++-- .../functions/FlinkMergingReduceFunction.java | 39 ++- .../functions/FlinkMultiOutputDoFnFunction.java | 101 +++---- .../FlinkMultiOutputProcessContext.java | 118 -------- .../functions/FlinkNoElementAssignContext.java | 68 ----- .../functions/FlinkNoOpStepContext.java | 73 +++++ .../functions/FlinkPartialReduceFunction.java | 53 ++-- .../functions/FlinkProcessContextBase.java | 267 ------------------- .../functions/FlinkReduceFunction.java | 49 ++-- .../functions/FlinkSideInputReader.java | 80 ++++++ .../FlinkSingleOutputProcessContext.java | 69 ----- 17 files changed, 419 insertions(+), 983 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index de8b43f..99651c3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -396,7 +396,7 @@ class FlinkBatchTransformTranslators { inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); // construct a map from side input to WindowingStrategy so that - // the OldDoFn runner can map main-input windows to side input windows + // the DoFn runner can map main-input windows to side input windows Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); for (PCollectionView<?> sideInput: transform.getSideInputs()) { sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); @@ -544,7 +544,7 @@ class FlinkBatchTransformTranslators { List<PCollectionView<?>> sideInputs = transform.getSideInputs(); // construct a map from side input to WindowingStrategy so that - // the OldDoFn runner can map main-input windows to side input windows + // the DoFn runner can map main-input windows to side input windows Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); for (PCollectionView<?> sideInput: sideInputs) { sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); @@ -627,7 +627,7 @@ class FlinkBatchTransformTranslators { List<PCollectionView<?>> sideInputs = transform.getSideInputs(); // construct a map from side input to WindowingStrategy so that - // the OldDoFn runner can map main-input windows to side input windows + // the DoFn runner can map main-input windows to side input windows Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputStrategies = new HashMap<>(); for (PCollectionView<?> sideInput: sideInputs) { sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); @@ -640,7 +640,8 @@ class FlinkBatchTransformTranslators { windowingStrategy, sideInputStrategies, context.getPipelineOptions(), - outputMap); + outputMap, + transform.getMainOutputTag()); MapPartitionOperator<WindowedValue<InputT>, WindowedValue<RawUnionValue>> taggedDataSet = new MapPartitionOperator<>( http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java deleted file mode 100644 index 71c3aa4..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import java.io.Serializable; -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; - -/** - * An interface that runs a {@link PerKeyCombineFn} with unified APIs using - * {@link OldDoFn.ProcessContext}. - */ -@Deprecated -public interface OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> extends Serializable { - /** - * Forwards the call to a {@link PerKeyCombineFn} to create the accumulator in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to add the input in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT addInput(K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to merge accumulators in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c); - - /** - * Forwards the call to a {@link PerKeyCombineFn} to extract the output in a {@link OldDoFn}. - * - * <p>It constructs a {@code CombineWithContext.Context} from {@code OldDoFn.ProcessContext} - * if it is required. - */ - OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c); -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java deleted file mode 100644 index 90894f2..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; -import org.apache.beam.sdk.transforms.CombineWithContext; -import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * Static utility methods that provide {@link OldPerKeyCombineFnRunner} implementations - * for different keyed combine functions. - */ -@Deprecated -public class OldPerKeyCombineFnRunners { - /** - * Returns a {@link PerKeyCombineFnRunner} from a {@link PerKeyCombineFn}. - */ - public static <K, InputT, AccumT, OutputT> OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> - create(PerKeyCombineFn<K, InputT, AccumT, OutputT> perKeyCombineFn) { - if (perKeyCombineFn instanceof KeyedCombineFnWithContext) { - return new KeyedCombineFnWithContextRunner<>( - (KeyedCombineFnWithContext<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else if (perKeyCombineFn instanceof KeyedCombineFn) { - return new KeyedCombineFnRunner<>( - (KeyedCombineFn<K, InputT, AccumT, OutputT>) perKeyCombineFn); - } else { - throw new IllegalStateException( - String.format("Unknown type of CombineFn: %s", perKeyCombineFn.getClass())); - } - } - - /** Returns a {@code Combine.Context} that wraps a {@code OldDoFn.ProcessContext}. */ - private static CombineWithContext.Context createFromProcessContext( - final OldDoFn<?, ?>.ProcessContext c) { - return new CombineWithContext.Context() { - @Override - public PipelineOptions getPipelineOptions() { - return c.getPipelineOptions(); - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - return c.sideInput(view); - } - }; - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFn}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFn}. - */ - private static class KeyedCombineFnRunner<K, InputT, AccumT, OutputT> - implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn; - - private KeyedCombineFnRunner( - KeyedCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } - - @Override - public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.createAccumulator(key); - } - - @Override - public AccumT addInput( - K key, AccumT accumulator, InputT input, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.addInput(key, accumulator, input); - } - - @Override - public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.mergeAccumulators(key, accumulators); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFn.extractOutput(key, accumulator); - } - - @Override - public String toString() { - return keyedCombineFn.toString(); - } - } - - /** - * An implementation of {@link PerKeyCombineFnRunner} with {@link KeyedCombineFnWithContext}. - * - * <p>It forwards functions calls to the {@link KeyedCombineFnWithContext}. - */ - private static class KeyedCombineFnWithContextRunner<K, InputT, AccumT, OutputT> - implements OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> { - private final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext; - - private KeyedCombineFnWithContextRunner( - KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedCombineFnWithContext) { - this.keyedCombineFnWithContext = keyedCombineFnWithContext; - } - - @Override - public AccumT createAccumulator(K key, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.createAccumulator(key, - createFromProcessContext(c)); - } - - @Override - public AccumT addInput( - K key, AccumT accumulator, InputT value, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.addInput(key, accumulator, value, - createFromProcessContext(c)); - } - - @Override - public AccumT mergeAccumulators( - K key, Iterable<AccumT> accumulators, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.mergeAccumulators( - key, accumulators, createFromProcessContext(c)); - } - - @Override - public OutputT extractOutput(K key, AccumT accumulator, OldDoFn<?, ?>.ProcessContext c) { - return keyedCombineFnWithContext.extractOutput(key, accumulator, - createFromProcessContext(c)); - } - - @Override - public String toString() { - return keyedCombineFnWithContext.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java new file mode 100644 index 0000000..fb2493b --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkAggregatorFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import org.apache.beam.runners.core.AggregatorFactory; +import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * A {@link AggregatorFactory} for the Flink Batch Runner. + */ +public class FlinkAggregatorFactory implements AggregatorFactory{ + + private final RuntimeContext runtimeContext; + + public FlinkAggregatorFactory(RuntimeContext runtimeContext) { + this.runtimeContext = runtimeContext; + } + + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName, + Combine.CombineFn<InputT, AccumT, OutputT> combine) { + @SuppressWarnings("unchecked") + SerializableFnAggregatorWrapper<InputT, OutputT> result = + (SerializableFnAggregatorWrapper<InputT, OutputT>) + runtimeContext.getAccumulator(aggregatorName); + + if (result == null) { + result = new SerializableFnAggregatorWrapper<>(combine); + runtimeContext.addAccumulator(aggregatorName, result); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 8b2bcc6..7081aad 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -17,50 +17,51 @@ */ package org.apache.beam.runners.flink.translation.functions; +import java.util.Collections; import java.util.Map; -import org.apache.beam.runners.core.DoFnAdapters; -import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** - * Encapsulates a {@link OldDoFn} + * Encapsulates a {@link DoFn} * inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. */ public class FlinkDoFnFunction<InputT, OutputT> extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<OutputT>> { - private final OldDoFn<InputT, OutputT> doFn; private final SerializedPipelineOptions serializedOptions; + private final DoFn<InputT, OutputT> doFn; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - private final boolean requiresWindowAccess; - private final boolean hasSideInputs; - private final WindowingStrategy<?, ?> windowingStrategy; + private transient DoFnInvoker<InputT, OutputT> doFnInvoker; + public FlinkDoFnFunction( DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options) { - this.doFn = DoFnAdapters.toOldDoFn(doFn); + + this.doFn = doFn; this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(options); this.windowingStrategy = windowingStrategy; - this.requiresWindowAccess = - DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow(); - this.hasSideInputs = !sideInputs.isEmpty(); } @Override @@ -68,48 +69,53 @@ public class FlinkDoFnFunction<InputT, OutputT> Iterable<WindowedValue<InputT>> values, Collector<WindowedValue<OutputT>> out) throws Exception { - FlinkSingleOutputProcessContext<InputT, OutputT> context = - new FlinkSingleOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, - out); - - this.doFn.startBundle(context); - - if (!requiresWindowAccess || hasSideInputs) { - // we don't need to explode the windows - for (WindowedValue<InputT> value : values) { - context.setWindowedValue(value); - doFn.processElement(context); - } - } else { - // we need to explode the windows because we have per-window - // side inputs and window access also only works if an element - // is in only one window - for (WindowedValue<InputT> value : values) { - for (WindowedValue<InputT> explodedValue : value.explodeWindows()) { - context.setWindowedValue(explodedValue); - doFn.processElement(context); - } - } + RuntimeContext runtimeContext = getRuntimeContext(); + + DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( + serializedOptions.getPipelineOptions(), doFn, + new FlinkSideInputReader(sideInputs, runtimeContext), + new DoFnOutputManager(out), + new TupleTag<OutputT>() { + }, + Collections.<TupleTag<?>>emptyList(), + new FlinkNoOpStepContext(), + new FlinkAggregatorFactory(runtimeContext), + windowingStrategy); + + doFnRunner.startBundle(); + + for (WindowedValue<InputT> value : values) { + doFnRunner.processElement(value); } - // set the windowed value to null so that the special logic for outputting - // in startBundle/finishBundle kicks in - context.setWindowedValue(null); - this.doFn.finishBundle(context); + doFnRunner.finishBundle(); } @Override public void open(Configuration parameters) throws Exception { - doFn.setup(); + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); } @Override public void close() throws Exception { - doFn.teardown(); + doFnInvoker.invokeTeardown(); + } + + private class DoFnOutputManager + implements DoFnRunners.OutputManager { + + private Collector collector; + + DoFnOutputManager(Collector<WindowedValue<OutputT>> collector) { + this.collector = collector; + } + + @Override + @SuppressWarnings("unchecked") + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + collector.collect(output); + } } + } http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 5ec6a77..26fd0b4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -24,9 +24,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; @@ -52,13 +51,11 @@ import org.joda.time.Instant; * yet be in their correct windows for side-input access. */ public class FlinkMergingNonShuffleReduceFunction< - K, InputT, AccumT, OutputT, W extends IntervalWindow> + K, InputT, AccumT, OutputT, W extends IntervalWindow> extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> { private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn; - private final OldDoFn<KV<K, InputT>, KV<K, OutputT>> doFn; - private final WindowingStrategy<?, W> windowingStrategy; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; @@ -78,13 +75,6 @@ public class FlinkMergingNonShuffleReduceFunction< this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - // dummy OldDoFn because we need one for ProcessContext - this.doFn = new OldDoFn<KV<K, InputT>, KV<K, OutputT>>() { - @Override - public void processElement(ProcessContext c) throws Exception { - - } - }; } @Override @@ -92,17 +82,13 @@ public class FlinkMergingNonShuffleReduceFunction< Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext = - new FlinkSingleOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, out - ); + PipelineOptions options = serializedOptions.getPipelineOptions(); - OldPerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = - OldPerKeyCombineFnRunners.create(combineFn); + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = @@ -112,8 +98,8 @@ public class FlinkMergingNonShuffleReduceFunction< // memory // this seems very unprudent, but correct, for now List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue: elements) { - for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) { + for (WindowedValue<KV<K, InputT>> inputValue : elements) { + for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { sortedInput.add(exploded); } } @@ -141,9 +127,10 @@ public class FlinkMergingNonShuffleReduceFunction< IntervalWindow currentWindow = (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); InputT firstValue = currentValue.getValue().getValue(); - processContext.setWindowedValue(currentValue); - AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); + AccumT accumulator = + combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, + options, sideInputReader, currentValue.getWindows()); // we use this to keep track of the timestamps assigned by the OutputTimeFn Instant windowTimestamp = @@ -151,14 +138,15 @@ public class FlinkMergingNonShuffleReduceFunction< while (iterator.hasNext()) { WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); if (currentWindow.equals(nextWindow)) { // continue accumulating and merge windows InputT value = nextValue.getValue().getValue(); - processContext.setWindowedValue(nextValue); - accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); windowTimestamp = outputTimeFn.combine( windowTimestamp, @@ -168,24 +156,29 @@ public class FlinkMergingNonShuffleReduceFunction< // emit the value that we currently have out.collect( WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), windowTimestamp, currentWindow, PaneInfo.NO_FIRING)); currentWindow = nextWindow; + currentValue = nextValue; InputT value = nextValue.getValue().getValue(); - processContext.setWindowedValue(nextValue); - accumulator = combineFnRunner.createAccumulator(key, processContext); - accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); } + } // emit the final accumulator out.collect( WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), windowTimestamp, currentWindow, PaneInfo.NO_FIRING)); http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java index cf058e8..c68f155 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java @@ -24,8 +24,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -45,7 +45,7 @@ import org.joda.time.Instant; * same behaviour as {@code MergeOverlappingIntervalWindows}. */ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow> - extends FlinkPartialReduceFunction<K, InputT, AccumT, W> { + extends FlinkPartialReduceFunction<K, InputT, AccumT, W> { public FlinkMergingPartialReduceFunction( CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, @@ -60,17 +60,13 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = - new FlinkSingleOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, out - ); + PipelineOptions options = serializedOptions.getPipelineOptions(); - OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - OldPerKeyCombineFnRunners.create(combineFn); + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = @@ -80,8 +76,8 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte // memory // this seems very unprudent, but correct, for now List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue: elements) { - for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) { + for (WindowedValue<KV<K, InputT>> inputValue : elements) { + for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { sortedInput.add(exploded); } } @@ -109,9 +105,10 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte IntervalWindow currentWindow = (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); InputT firstValue = currentValue.getValue().getValue(); - processContext.setWindowedValue(currentValue); - AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); + AccumT accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, + options, sideInputReader, currentValue.getWindows()); // we use this to keep track of the timestamps assigned by the OutputTimeFn Instant windowTimestamp = @@ -125,8 +122,8 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte // continue accumulating and merge windows InputT value = nextValue.getValue().getValue(); - processContext.setWindowedValue(nextValue); - accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); windowTimestamp = outputTimeFn.combine( windowTimestamp, @@ -142,10 +139,12 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte PaneInfo.NO_FIRING)); currentWindow = nextWindow; + currentValue = nextValue; InputT value = nextValue.getValue().getValue(); - processContext.setWindowedValue(nextValue); - accumulator = combineFnRunner.createAccumulator(key, processContext); - accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java index 4fa4578..84b3adc 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java @@ -26,8 +26,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -62,29 +62,24 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi Iterable<WindowedValue<KV<K, AccumT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = - new FlinkSingleOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, out - ); + PipelineOptions options = serializedOptions.getPipelineOptions(); - OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - OldPerKeyCombineFnRunners.create(combineFn); + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); + + PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - // get all elements so that we can sort them, has to fit into // memory // this seems very unprudent, but correct, for now ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, AccumT>> inputValue: elements) { - for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) { + for (WindowedValue<KV<K, AccumT>> inputValue : elements) { + for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) { sortedInput.add(exploded); } } @@ -127,25 +122,24 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi if (nextWindow.equals(currentWindow)) { // continue accumulating and merge windows - processContext.setWindowedValue(nextValue); - accumulator = combineFnRunner.mergeAccumulators( - key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext); + key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), + options, sideInputReader, currentValue.getWindows()); windowTimestamps.add(nextValue.getTimestamp()); } else { out.collect( WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), outputTimeFn.merge(currentWindow, windowTimestamps), currentWindow, PaneInfo.NO_FIRING)); windowTimestamps.clear(); - processContext.setWindowedValue(nextValue); - currentWindow = nextWindow; + currentValue = nextValue; accumulator = nextValue.getValue().getValue(); windowTimestamps.add(nextValue.getTimestamp()); } @@ -154,7 +148,8 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi // emit the final accumulator out.collect( WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), outputTimeFn.merge(currentWindow, windowTimestamps), currentWindow, PaneInfo.NO_FIRING)); http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index aeeabbf..27ba5ac 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -17,24 +17,27 @@ */ package org.apache.beam.runners.flink.translation.functions; +import java.util.Collections; import java.util.Map; -import org.apache.beam.runners.core.DoFnAdapters; -import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.api.common.functions.RichMapPartitionFunction; +import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** - * Encapsulates a {@link OldDoFn} that can emit to multiple + * Encapsulates a {@link DoFn} that can emit to multiple * outputs inside a Flink {@link org.apache.flink.api.common.functions.RichMapPartitionFunction}. * * <p>We get a mapping from {@link org.apache.beam.sdk.values.TupleTag} to output index @@ -44,33 +47,30 @@ import org.apache.flink.util.Collector; public class FlinkMultiOutputDoFnFunction<InputT, OutputT> extends RichMapPartitionFunction<WindowedValue<InputT>, WindowedValue<RawUnionValue>> { - private final OldDoFn<InputT, OutputT> doFn; + private final DoFn<InputT, OutputT> doFn; private final SerializedPipelineOptions serializedOptions; private final Map<TupleTag<?>, Integer> outputMap; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - - private final boolean requiresWindowAccess; - private final boolean hasSideInputs; - private final WindowingStrategy<?, ?> windowingStrategy; + private TupleTag<OutputT> mainOutputTag; + private transient DoFnInvoker<InputT, OutputT> doFnInvoker; public FlinkMultiOutputDoFnFunction( DoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions options, - Map<TupleTag<?>, Integer> outputMap) { - this.doFn = DoFnAdapters.toOldDoFn(doFn); + Map<TupleTag<?>, Integer> outputMap, + TupleTag<OutputT> mainOutputTag) { + this.doFn = doFn; this.serializedOptions = new SerializedPipelineOptions(options); this.outputMap = outputMap; - this.requiresWindowAccess = - DoFnSignatures.signatureForDoFn(doFn).processElement().observesWindow(); - this.hasSideInputs = !sideInputs.isEmpty(); this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; + this.mainOutputTag = mainOutputTag; } @Override @@ -78,49 +78,54 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT> Iterable<WindowedValue<InputT>> values, Collector<WindowedValue<RawUnionValue>> out) throws Exception { - FlinkMultiOutputProcessContext<InputT, OutputT> context = - new FlinkMultiOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, out, - outputMap - ); - - this.doFn.startBundle(context); - - if (!requiresWindowAccess || hasSideInputs) { - // we don't need to explode the windows - for (WindowedValue<InputT> value : values) { - context.setWindowedValue(value); - doFn.processElement(context); - } - } else { - // we need to explode the windows because we have per-window - // side inputs and window access also only works if an element - // is in only one window - for (WindowedValue<InputT> value : values) { - for (WindowedValue<InputT> explodedValue : value.explodeWindows()) { - context.setWindowedValue(value); - doFn.processElement(context); - } - } + RuntimeContext runtimeContext = getRuntimeContext(); + + DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( + serializedOptions.getPipelineOptions(), doFn, + new FlinkSideInputReader(sideInputs, runtimeContext), + new DoFnOutputManager(out), + mainOutputTag, + // see SimpleDoFnRunner, just use it to limit number of side outputs + Collections.<TupleTag<?>>emptyList(), + new FlinkNoOpStepContext(), + new FlinkAggregatorFactory(runtimeContext), + windowingStrategy); + + doFnRunner.startBundle(); + + for (WindowedValue<InputT> value : values) { + doFnRunner.processElement(value); } - // set the windowed value to null so that the special logic for outputting - // in startBundle/finishBundle kicks in - context.setWindowedValue(null); - this.doFn.finishBundle(context); + doFnRunner.finishBundle(); + } @Override public void open(Configuration parameters) throws Exception { - doFn.setup(); + doFnInvoker = DoFnInvokers.invokerFor(doFn); + doFnInvoker.invokeSetup(); } @Override public void close() throws Exception { - doFn.teardown(); + doFnInvoker.invokeTeardown(); + } + + private class DoFnOutputManager + implements DoFnRunners.OutputManager { + + private Collector<WindowedValue<RawUnionValue>> collector; + + DoFnOutputManager(Collector<WindowedValue<RawUnionValue>> collector) { + this.collector = collector; + } + + @Override + public <T> void output(TupleTag<T> tag, WindowedValue<T> output) { + collector.collect(WindowedValue.of(new RawUnionValue(outputMap.get(tag), output.getValue()), + output.getTimestamp(), output.getWindows(), output.getPane())); + } } + } http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java deleted file mode 100644 index 7882b5f..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import java.util.Collection; -import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.join.RawUnionValue; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports side - * outputs. - */ -class FlinkMultiOutputProcessContext<InputT, OutputT> - extends FlinkProcessContextBase<InputT, OutputT> { - - private final Collector<WindowedValue<RawUnionValue>> collector; - private final Map<TupleTag<?>, Integer> outputMap; - - FlinkMultiOutputProcessContext( - PipelineOptions pipelineOptions, - RuntimeContext runtimeContext, - OldDoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ?> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - Collector<WindowedValue<RawUnionValue>> collector, - Map<TupleTag<?>, Integer> outputMap) { - super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs); - this.collector = collector; - this.outputMap = outputMap; - } - - @Override - protected void outputWithTimestampAndWindow( - OutputT value, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - collector.collect(WindowedValue.of(new RawUnionValue(0, value), timestamp, windows, pane)); - } - - @Override - @SuppressWarnings("unchecked") - public <T> void sideOutput(TupleTag<T> tag, T value) { - if (windowedValue != null) { - sideOutputWithTimestamp(tag, value, windowedValue.getTimestamp()); - } else { - sideOutputWithTimestamp(tag, value, null); - } - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) { - Integer index = outputMap.get(tag); - - if (index == null) { - throw new IllegalArgumentException("Unknown side output tag: " + tag); - } - - outputUnionValue(value, timestamp, new RawUnionValue(index, value)); - } - - private <T> void outputUnionValue(T value, Instant timestamp, RawUnionValue unionValue) { - if (windowedValue == null) { - // we are in startBundle() or finishBundle() - - try { - Collection<? extends BoundedWindow> windows = - windowingStrategy - .getWindowFn() - .assignWindows( - new FlinkNoElementAssignContext( - windowingStrategy.getWindowFn(), value, timestamp)); - - collector.collect( - WindowedValue.of( - unionValue, - timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), - windows, - PaneInfo.NO_FIRING)); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - collector.collect( - WindowedValue.of( - unionValue, - windowedValue.getTimestamp(), - windowedValue.getWindows(), - windowedValue.getPane())); - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java deleted file mode 100644 index ad7255b..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.joda.time.Instant; - -/** - * {@link WindowFn.AssignContext} for calling a {@link WindowFn} for elements emitted from - * {@link OldDoFn#startBundle(OldDoFn.Context)} - * or {@link OldDoFn#finishBundle(OldDoFn.Context)}. - * - * <p>In those cases the {@code WindowFn} is not allowed to access any element information. - */ -class FlinkNoElementAssignContext<InputT, W extends BoundedWindow> - extends WindowFn<InputT, W>.AssignContext { - - private final InputT element; - private final Instant timestamp; - - FlinkNoElementAssignContext( - WindowFn<InputT, W> fn, - InputT element, - Instant timestamp) { - fn.super(); - - this.element = element; - // the timestamp can be null, in that case output is called - // without a timestamp - this.timestamp = timestamp; - } - - @Override - public InputT element() { - return element; - } - - @Override - public Instant timestamp() { - if (timestamp != null) { - return timestamp; - } else { - throw new UnsupportedOperationException("No timestamp available."); - } - } - - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException("No window available."); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java new file mode 100644 index 0000000..d901d8e --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoOpStepContext.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import java.io.IOException; +import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StateInternals; +import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.TupleTag; + +/** + * A {@link StepContext} for Flink Batch Runner execution. + */ +public class FlinkNoOpStepContext implements StepContext { + + @Override + public String getStepName() { + return null; + } + + @Override + public String getTransformName() { + return null; + } + + @Override + public void noteOutput(WindowedValue<?> output) { + + } + + @Override + public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> output) { + + } + + @Override + public <T, W extends BoundedWindow> void writePCollectionViewData( + TupleTag<?> tag, + Iterable<WindowedValue<T>> data, + Coder<Iterable<WindowedValue<T>>> dataCoder, + W window, + Coder<W> windowCoder) throws IOException { + } + + @Override + public StateInternals<?> stateInternals() { + return null; + } + + @Override + public TimerInternals timerInternals() { + return null; + } +} + http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index 7db30d1..1d1ff9f 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -24,9 +24,8 @@ import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; @@ -55,8 +54,6 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn; - protected final OldDoFn<KV<K, InputT>, KV<K, AccumT>> doFn; - protected final WindowingStrategy<?, W> windowingStrategy; protected final SerializedPipelineOptions serializedOptions; @@ -74,13 +71,6 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - // dummy OldDoFn because we need one for ProcessContext - this.doFn = new OldDoFn<KV<K, InputT>, KV<K, AccumT>>() { - @Override - public void processElement(ProcessContext c) throws Exception { - - } - }; } @Override @@ -88,17 +78,13 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind Iterable<WindowedValue<KV<K, InputT>>> elements, Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext = - new FlinkSingleOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, out - ); + PipelineOptions options = serializedOptions.getPipelineOptions(); + + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); - OldPerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - OldPerKeyCombineFnRunners.create(combineFn); + PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = @@ -108,8 +94,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind // memory // this seems very unprudent, but correct, for now ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue: elements) { - for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) { + for (WindowedValue<KV<K, InputT>> inputValue : elements) { + for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { sortedInput.add(exploded); } } @@ -132,9 +118,10 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind K key = currentValue.getValue().getKey(); BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); InputT firstValue = currentValue.getValue().getValue(); - processContext.setWindowedValue(currentValue); - AccumT accumulator = combineFnRunner.createAccumulator(key, processContext); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext); + AccumT accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, firstValue, + options, sideInputReader, currentValue.getWindows()); // we use this to keep track of the timestamps assigned by the OutputTimeFn Instant windowTimestamp = @@ -147,8 +134,8 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind if (nextWindow.equals(currentWindow)) { // continue accumulating InputT value = nextValue.getValue().getValue(); - processContext.setWindowedValue(nextValue); - accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); windowTimestamp = outputTimeFn.combine( windowTimestamp, @@ -164,10 +151,12 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind PaneInfo.NO_FIRING)); currentWindow = nextWindow; + currentValue = nextValue; InputT value = nextValue.getValue().getValue(); - processContext.setWindowedValue(nextValue); - accumulator = combineFnRunner.createAccumulator(key, processContext); - accumulator = combineFnRunner.addInput(key, accumulator, value, processContext); + accumulator = combineFnRunner.createAccumulator(key, + options, sideInputReader, currentValue.getWindows()); + accumulator = combineFnRunner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java deleted file mode 100644 index 9b83eb4..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import static com.google.common.base.Preconditions.checkNotNull; - -import com.google.common.collect.Iterables; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; -import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.runners.core.StateInternals; -import org.apache.beam.runners.core.TimerInternals; -import org.apache.beam.runners.core.WindowingInternals; -import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.joda.time.Instant; - -/** - * {@link OldDoFn.ProcessContext} for our Flink Wrappers. - */ -abstract class FlinkProcessContextBase<InputT, OutputT> - extends OldDoFn<InputT, OutputT>.ProcessContext { - - private final PipelineOptions pipelineOptions; - private final RuntimeContext runtimeContext; - private final boolean requiresWindowAccess; - protected final WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy; - private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; - - protected WindowedValue<InputT> windowedValue; - - FlinkProcessContextBase( - PipelineOptions pipelineOptions, - RuntimeContext runtimeContext, - OldDoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ? extends BoundedWindow>> sideInputs) { - doFn.super(); - checkNotNull(pipelineOptions); - checkNotNull(runtimeContext); - checkNotNull(doFn); - - this.pipelineOptions = pipelineOptions; - this.runtimeContext = runtimeContext; - this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess; - this.windowingStrategy = windowingStrategy; - this.sideInputs = sideInputs; - - super.setupDelegateAggregators(); - } - - public void setWindowedValue(WindowedValue<InputT> windowedValue) { - this.windowedValue = windowedValue; - } - - @Override - public InputT element() { - return this.windowedValue.getValue(); - } - - - @Override - public Instant timestamp() { - return windowedValue.getTimestamp(); - } - - @Override - public BoundedWindow window() { - if (!requiresWindowAccess) { - throw new UnsupportedOperationException( - "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess."); - } - return Iterables.getOnlyElement(windowedValue.getWindows()); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public WindowingInternals<InputT, OutputT> windowingInternals() { - - return new WindowingInternals<InputT, OutputT>() { - - @Override - public StateInternals stateInternals() { - throw new UnsupportedOperationException(); - } - - @Override - public void outputWindowedValue( - OutputT value, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputWithTimestampAndWindow(value, timestamp, windows, pane); - } - - @Override - public <SideOutputT> void sideOutputWindowedValue( - TupleTag<SideOutputT> tag, - SideOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - throw new UnsupportedOperationException(); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException(); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return windowedValue.getWindows(); - } - - @Override - public PaneInfo pane() { - return windowedValue.getPane(); - } - - @Override - public <ViewT> ViewT sideInput( - PCollectionView<ViewT> view, - BoundedWindow sideInputWindow) { - - checkNotNull(view, "View passed to sideInput cannot be null"); - checkNotNull( - sideInputs.get(view), - "Side input for " + view + " not available."); - - Map<BoundedWindow, ViewT> sideInputs = - runtimeContext.getBroadcastVariableWithInitializer( - view.getTagInternal().getId(), new SideInputInitializer<>(view)); - return sideInputs.get(sideInputWindow); - } - }; - } - - @Override - public PipelineOptions getPipelineOptions() { - return pipelineOptions; - } - - @Override - public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) { - checkNotNull(view, "View passed to sideInput cannot be null"); - checkNotNull(sideInputs.get(view), "Side input for " + view + " not available."); - Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator(); - BoundedWindow window; - if (!windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is not in any windows"); - } else { - window = windowIter.next(); - if (windowIter.hasNext()) { - throw new IllegalStateException( - "sideInput called when main input element is in multiple windows"); - } - } - - // get the side input strategy for mapping the window - WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view); - - BoundedWindow sideInputWindow = - windowingStrategy.getWindowFn().getSideInputWindow(window); - - Map<BoundedWindow, ViewT> sideInputs = - runtimeContext.getBroadcastVariableWithInitializer( - view.getTagInternal().getId(), new SideInputInitializer<>(view)); - ViewT result = sideInputs.get(sideInputWindow); - if (result == null) { - result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); - } - return result; - } - - @Override - public void output(OutputT value) { - if (windowedValue != null) { - outputWithTimestamp(value, windowedValue.getTimestamp()); - } else { - outputWithTimestamp(value, null); - } - } - - @Override - public final void outputWithTimestamp(OutputT value, Instant timestamp) { - if (windowedValue == null) { - // we are in startBundle() or finishBundle() - - try { - Collection windows = windowingStrategy.getWindowFn().assignWindows( - new FlinkNoElementAssignContext( - windowingStrategy.getWindowFn(), - value, - timestamp)); - - outputWithTimestampAndWindow( - value, - timestamp != null ? timestamp : new Instant(Long.MIN_VALUE), - windows, - PaneInfo.NO_FIRING); - } catch (Exception e) { - throw new RuntimeException(e); - } - } else { - outputWithTimestampAndWindow( - value, timestamp, windowedValue.getWindows(), windowedValue.getPane()); - } - } - - protected abstract void outputWithTimestampAndWindow( - OutputT value, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane); - - @Override - public abstract <T> void sideOutput(TupleTag<T> tag, T output); - - @Override - public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp); - - @Override - public <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> - createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - @SuppressWarnings("unchecked") - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> result = - (SerializableFnAggregatorWrapper<AggInputT, AggOutputT>) - runtimeContext.getAccumulator(name); - - if (result == null) { - result = new SerializableFnAggregatorWrapper<>(combiner); - runtimeContext.addAccumulator(name, result); - } - return result; } -} http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index 81e37f4..3e4f742 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -26,9 +26,8 @@ import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; -import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; @@ -57,8 +56,6 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn; - protected final OldDoFn<KV<K, AccumT>, KV<K, OutputT>> doFn; - protected final WindowingStrategy<?, W> windowingStrategy; protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; @@ -78,13 +75,6 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); - // dummy OldDoFn because we need one for ProcessContext - this.doFn = new OldDoFn<KV<K, AccumT>, KV<K, OutputT>>() { - @Override - public void processElement(ProcessContext c) throws Exception { - - } - }; } @Override @@ -92,17 +82,13 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> Iterable<WindowedValue<KV<K, AccumT>>> elements, Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext = - new FlinkSingleOutputProcessContext<>( - serializedOptions.getPipelineOptions(), - getRuntimeContext(), - doFn, - windowingStrategy, - sideInputs, out - ); + PipelineOptions options = serializedOptions.getPipelineOptions(); + + FlinkSideInputReader sideInputReader = + new FlinkSideInputReader(sideInputs, getRuntimeContext()); - OldPerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - OldPerKeyCombineFnRunners.create(combineFn); + PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = + PerKeyCombineFnRunners.create(combineFn); @SuppressWarnings("unchecked") OutputTimeFn<? super BoundedWindow> outputTimeFn = @@ -150,17 +136,17 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> if (nextWindow.equals(currentWindow)) { // continue accumulating - processContext.setWindowedValue(nextValue); accumulator = combineFnRunner.mergeAccumulators( - key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext); + key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), + options, sideInputReader, currentValue.getWindows()); windowTimestamps.add(nextValue.getTimestamp()); } else { // emit the value that we currently have - processContext.setWindowedValue(currentValue); out.collect( WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), outputTimeFn.merge(currentWindow, windowTimestamps), currentWindow, PaneInfo.NO_FIRING)); @@ -168,23 +154,18 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> windowTimestamps.clear(); currentWindow = nextWindow; + currentValue = nextValue; accumulator = nextValue.getValue().getValue(); windowTimestamps.add(nextValue.getTimestamp()); } - // we have to keep track so that we can set the context to the right - // windowed value when windows change in the iterable - currentValue = nextValue; } - // if at the end of the iteration we have a change in windows - // the ProcessContext will not have been updated - processContext.setWindowedValue(currentValue); - // emit the final accumulator out.collect( WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)), + KV.of(key, combineFnRunner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), outputTimeFn.merge(currentWindow, windowTimestamps), currentWindow, PaneInfo.NO_FIRING)); http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java new file mode 100644 index 0000000..c317182 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.flink.api.common.functions.RuntimeContext; + +/** + * A {@link SideInputReader} for the Flink Batch Runner. + */ +public class FlinkSideInputReader implements SideInputReader { + + private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs; + + private RuntimeContext runtimeContext; + + public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView, + RuntimeContext runtimeContext) { + sideInputs = new HashMap<>(); + for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) { + sideInputs.put(entry.getKey().getTagInternal(), entry.getValue()); + } + this.runtimeContext = runtimeContext; + } + + @Nullable + @Override + public <T> T get(PCollectionView<T> view, BoundedWindow window) { + checkNotNull(view, "View passed to sideInput cannot be null"); + TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal(); + checkNotNull( + sideInputs.get(tag), + "Side input for " + view + " not available."); + + Map<BoundedWindow, T> sideInputs = + runtimeContext.getBroadcastVariableWithInitializer( + tag.getId(), new SideInputInitializer<>(view)); + T result = sideInputs.get(window); + if (result == null) { + result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); + } + return result; + } + + @Override + public <T> boolean contains(PCollectionView<T> view) { + return sideInputs.containsKey(view.getTagInternal()); + } + + @Override + public boolean isEmpty() { + return sideInputs.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/0ae2a385/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java deleted file mode 100644 index 0db7f5a..0000000 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import java.util.Collection; -import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** {@link OldDoFn.ProcessContext} for {@link FlinkDoFnFunction} with a single main output. */ -class FlinkSingleOutputProcessContext<InputT, OutputT> - extends FlinkProcessContextBase<InputT, OutputT> { - - private final Collector<WindowedValue<OutputT>> collector; - - FlinkSingleOutputProcessContext( - PipelineOptions pipelineOptions, - RuntimeContext runtimeContext, - OldDoFn<InputT, OutputT> doFn, - WindowingStrategy<?, ?> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - Collector<WindowedValue<OutputT>> collector) { - super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs); - this.collector = collector; - } - - @Override - protected void outputWithTimestampAndWindow( - OutputT value, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - collector.collect(WindowedValue.of(value, timestamp, windows, pane)); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T value) { - throw new UnsupportedOperationException(); - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) { - throw new UnsupportedOperationException(); - } -}
