Fix faulty Flink Flatten when PCollectionList is empty

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e60a497
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e60a497
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e60a497

Branch: refs/heads/master
Commit: 4e60a497b313414aa2b2968b8def6c6f753908fe
Parents: 26fa0b2
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri May 13 14:17:50 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri May 20 08:08:24 2016 +0200

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         | 32 +++++++++++++++-----
 1 file changed, 25 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e60a497/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index a03352e..07785aa 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -34,6 +34,7 @@ import 
org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.AvroIO;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Read;
@@ -61,6 +62,7 @@ import org.apache.beam.sdk.values.TupleTag;
 import com.google.api.client.util.Maps;
 import com.google.common.collect.Lists;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Keys;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -78,6 +80,7 @@ import org.apache.flink.api.java.operators.Grouping;
 import org.apache.flink.api.java.operators.MapPartitionOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.Collector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,7 +94,7 @@ import java.util.Map;
 /**
  * Translators for transforming
  * Dataflow {@link org.apache.beam.sdk.transforms.PTransform}s to
- * Flink {@link org.apache.flink.api.java.DataSet}s
+ * Flink {@link org.apache.flink.api.java.DataSet}s.
  */
 public class FlinkBatchTransformTranslators {
 
@@ -465,15 +468,30 @@ public class FlinkBatchTransformTranslators {
   private static class FlattenPCollectionTranslatorBatch<T> implements 
FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>>
 {
 
     @Override
+    @SuppressWarnings("unchecked")
     public void translateNode(Flatten.FlattenPCollectionList<T> transform, 
FlinkBatchTranslationContext context) {
       List<PCollection<T>> allInputs = context.getInput(transform).getAll();
       DataSet<T> result = null;
-      for(PCollection<T> collection : allInputs) {
-        DataSet<T> current = context.getInputDataSet(collection);
-        if (result == null) {
-          result = current;
-        } else {
-          result = result.union(current);
+      if (allInputs.isEmpty()) {
+        // create an empty dummy source to satisfy downstream operations
+        // we cannot create an empty source in Flink, therefore we have to
+        // add the flatMap that simply never forwards the single element
+        DataSource<String> dummySource =
+            context.getExecutionEnvironment().fromElements("dummy");
+        result = dummySource.flatMap(new FlatMapFunction<String, T>() {
+          @Override
+          public void flatMap(String s, Collector<T> collector) throws 
Exception {
+            // never return anything
+          }
+        }).returns(new CoderTypeInformation<>((Coder<T>) VoidCoder.of()));
+      } else {
+        for (PCollection<T> collection : allInputs) {
+          DataSet<T> current = context.getInputDataSet(collection);
+          if (result == null) {
+            result = current;
+          } else {
+            result = result.union(current);
+          }
         }
       }
       context.setOutputDataSet(context.getOutput(transform), result);

Reply via email to