Repository: beam
Updated Branches:
  refs/heads/master 2cb4b03de -> 711faffef


[BEAM-2380] Forward additional outputs to DoFnRunner in Flink Batch


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9afe395b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9afe395b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9afe395b

Branch: refs/heads/master
Commit: 9afe395bbddd2382c5222dd3145a0be3cdf7077a
Parents: 2cb4b03
Author: Aljoscha Krettek <[email protected]>
Authored: Tue May 30 10:56:56 2017 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Tue May 30 10:57:40 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/functions/FlinkDoFnFunction.java       | 8 +++++---
 .../translation/functions/FlinkStatefulDoFnFunction.java     | 8 +++++---
 2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 9205bce..42a8833 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -17,7 +17,8 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import java.util.Collections;
+import com.google.common.collect.Lists;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -97,13 +98,14 @@ public class FlinkDoFnFunction<InputT, OutputT>
           new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, 
outputMap);
     }
 
+    List<TupleTag<?>> additionalOutputTags = 
Lists.newArrayList(outputMap.keySet());
+
     DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(), doFn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
-        // see SimpleDoFnRunner, just use it to limit number of additional 
outputs
-        Collections.<TupleTag<?>>emptyList(),
+        additionalOutputTags,
         new FlinkNoOpStepContext(),
         windowingStrategy);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 6517bf2..b075768 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -19,8 +19,9 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-import java.util.Collections;
+import com.google.common.collect.Lists;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -114,13 +115,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     timerInternals.advanceProcessingTime(Instant.now());
     timerInternals.advanceSynchronizedProcessingTime(Instant.now());
 
+    List<TupleTag<?>> additionalOutputTags = 
Lists.newArrayList(outputMap.keySet());
+
     DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(), dofn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
-        // see SimpleDoFnRunner, just use it to limit number of additional 
outputs
-        Collections.<TupleTag<?>>emptyList(),
+        additionalOutputTags,
         new FlinkNoOpStepContext() {
           @Override
           public StateInternals stateInternals() {

Reply via email to