mr-runner: translate empty flatten into EmptySource, this fixes few empty 
FalttenTests.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/99bffd2a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/99bffd2a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/99bffd2a

Branch: refs/heads/mr-runner
Commit: 99bffd2a75b7461d15723567a57db6d3b17367cd
Parents: 8627913
Author: Pei He <[email protected]>
Authored: Fri Sep 1 14:11:30 2017 +0800
Committer: Pei He <[email protected]>
Committed: Fri Sep 1 17:13:53 2017 +0800

----------------------------------------------------------------------
 .../translation/FlattenTranslator.java          | 71 +++++++++++++++++++-
 1 file changed, 68 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/99bffd2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
index b966f2a..b869936 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java
@@ -17,6 +17,14 @@
  */
 package org.apache.beam.runners.mapreduce.translation;
 
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Flatten;
 
 /**
@@ -26,11 +34,68 @@ public class FlattenTranslator<T> extends 
TransformTranslator.Default<Flatten.PC
   @Override
   public void translateNode(Flatten.PCollections<T> transform, 
TranslationContext context) {
     TranslationContext.UserGraphContext userGraphContext = 
context.getUserGraphContext();
-
-    Operation<?> operation = new FlattenOperation();
+    List<Graphs.Tag> inputTags = userGraphContext.getInputTags();
+    Operation<?> operation;
+    if (inputTags.isEmpty()) {
+      // Create a empty source
+      operation = new SourceReadOperation(new EmptySource(), 
userGraphContext.getOnlyOutputTag());
+    } else {
+      operation = new FlattenOperation();
+    }
     context.addInitStep(
         Graphs.Step.of(userGraphContext.getStepName(), operation),
-        userGraphContext.getInputTags(),
+        inputTags,
         userGraphContext.getOutputTags());
   }
+
+  private static class EmptySource extends BoundedSource<Void> {
+    @Override
+    public List<? extends BoundedSource<Void>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception 
{
+      return Collections.EMPTY_LIST;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+      return 0;
+    }
+
+    @Override
+    public BoundedReader<Void> createReader(PipelineOptions options) throws 
IOException {
+      return new BoundedReader<Void>() {
+        @Override
+        public BoundedSource<Void> getCurrentSource() {
+          return EmptySource.this;
+        }
+
+        @Override
+        public boolean start() throws IOException {
+          return false;
+        }
+
+        @Override
+        public boolean advance() throws IOException {
+          return false;
+        }
+
+        @Override
+        public Void getCurrent() throws NoSuchElementException {
+          throw new NoSuchElementException();
+        }
+
+        @Override
+        public void close() throws IOException {
+        }
+      };
+    }
+
+    @Override
+    public void validate() {
+    }
+
+    @Override
+    public Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
+  }
 }

Reply via email to