mr-runner: translate empty flatten into EmptySource, this fixes few empty FalttenTests.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/99bffd2a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/99bffd2a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/99bffd2a Branch: refs/heads/mr-runner Commit: 99bffd2a75b7461d15723567a57db6d3b17367cd Parents: 8627913 Author: Pei He <[email protected]> Authored: Fri Sep 1 14:11:30 2017 +0800 Committer: Pei He <[email protected]> Committed: Fri Sep 1 17:13:53 2017 +0800 ---------------------------------------------------------------------- .../translation/FlattenTranslator.java | 71 +++++++++++++++++++- 1 file changed, 68 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/99bffd2a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java index b966f2a..b869936 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FlattenTranslator.java @@ -17,6 +17,14 @@ */ package org.apache.beam.runners.mapreduce.translation; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Flatten; /** @@ -26,11 +34,68 @@ public class FlattenTranslator<T> extends TransformTranslator.Default<Flatten.PC @Override public void translateNode(Flatten.PCollections<T> transform, TranslationContext context) { TranslationContext.UserGraphContext userGraphContext = context.getUserGraphContext(); - - Operation<?> operation = new FlattenOperation(); + List<Graphs.Tag> inputTags = userGraphContext.getInputTags(); + Operation<?> operation; + if (inputTags.isEmpty()) { + // Create a empty source + operation = new SourceReadOperation(new EmptySource(), userGraphContext.getOnlyOutputTag()); + } else { + operation = new FlattenOperation(); + } context.addInitStep( Graphs.Step.of(userGraphContext.getStepName(), operation), - userGraphContext.getInputTags(), + inputTags, userGraphContext.getOutputTags()); } + + private static class EmptySource extends BoundedSource<Void> { + @Override + public List<? extends BoundedSource<Void>> split( + long desiredBundleSizeBytes, PipelineOptions options) throws Exception { + return Collections.EMPTY_LIST; + } + + @Override + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { + return 0; + } + + @Override + public BoundedReader<Void> createReader(PipelineOptions options) throws IOException { + return new BoundedReader<Void>() { + @Override + public BoundedSource<Void> getCurrentSource() { + return EmptySource.this; + } + + @Override + public boolean start() throws IOException { + return false; + } + + @Override + public boolean advance() throws IOException { + return false; + } + + @Override + public Void getCurrent() throws NoSuchElementException { + throw new NoSuchElementException(); + } + + @Override + public void close() throws IOException { + } + }; + } + + @Override + public void validate() { + } + + @Override + public Coder<Void> getDefaultOutputCoder() { + return VoidCoder.of(); + } + } }
