Repository: beam Updated Branches: refs/heads/master 2cb4b03de -> 711faffef
[BEAM-2380] Forward additional outputs to DoFnRunner in Flink Batch Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9afe395b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9afe395b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9afe395b Branch: refs/heads/master Commit: 9afe395bbddd2382c5222dd3145a0be3cdf7077a Parents: 2cb4b03 Author: Aljoscha Krettek <[email protected]> Authored: Tue May 30 10:56:56 2017 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Tue May 30 10:57:40 2017 +0200 ---------------------------------------------------------------------- .../flink/translation/functions/FlinkDoFnFunction.java | 8 +++++--- .../translation/functions/FlinkStatefulDoFnFunction.java | 8 +++++--- 2 files changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 9205bce..42a8833 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -17,7 +17,8 @@ */ package org.apache.beam.runners.flink.translation.functions; -import java.util.Collections; +import com.google.common.collect.Lists; +import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -97,13 +98,14 @@ public class FlinkDoFnFunction<InputT, OutputT> new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap); } + List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); + DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), doFn, new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of additional outputs - Collections.<TupleTag<?>>emptyList(), + additionalOutputTags, new FlinkNoOpStepContext(), windowingStrategy); http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index 6517bf2..b075768 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -19,8 +19,9 @@ package org.apache.beam.runners.flink.translation.functions; import static org.apache.flink.util.Preconditions.checkArgument; -import java.util.Collections; +import com.google.common.collect.Lists; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; @@ -114,13 +115,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT> timerInternals.advanceProcessingTime(Instant.now()); timerInternals.advanceSynchronizedProcessingTime(Instant.now()); + List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet()); + DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), dofn, new FlinkSideInputReader(sideInputs, runtimeContext), outputManager, mainOutputTag, - // see SimpleDoFnRunner, just use it to limit number of additional outputs - Collections.<TupleTag<?>>emptyList(), + additionalOutputTags, new FlinkNoOpStepContext() { @Override public StateInternals stateInternals() {
