mr-runner: handle no files case in FileSideInputReader for empty views.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca0b15ad Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca0b15ad Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca0b15ad Branch: refs/heads/mr-runner Commit: ca0b15ada6cdbdfba9ac8adb0b8c874477587fae Parents: 807f903 Author: Pei He <p...@apache.org> Authored: Thu Aug 31 17:29:04 2017 +0800 Committer: Pei He <p...@apache.org> Committed: Fri Sep 1 17:13:40 2017 +0800 ---------------------------------------------------------------------- .../mapreduce/translation/FileSideInputReader.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ca0b15ad/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java index cb3a8c4..403de4e 100644 --- a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java +++ b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java @@ -76,14 +76,15 @@ public class FileSideInputReader implements SideInputReader { try { FileSystem fs = pattern.getFileSystem(conf); FileStatus[] files = fs.globStatus(pattern); - // TODO: handle empty views which may result in no files case. - SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[0].getPath(), conf); List<WindowedValue<?>> availableSideInputs = new ArrayList<>(); - BytesWritable value = new BytesWritable(); - while (reader.next(NullWritable.get(), value)) { - ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes()); - availableSideInputs.add(elemCoder.decode(inStream)); + if (files.length > 0) { + SequenceFile.Reader reader = new SequenceFile.Reader(fs, files[0].getPath(), conf); + BytesWritable value = new BytesWritable(); + while (reader.next(NullWritable.get(), value)) { + ByteArrayInputStream inStream = new ByteArrayInputStream(value.getBytes()); + availableSideInputs.add(elemCoder.decode(inStream)); + } } Iterable<WindowedValue<?>> sideInputForWindow = Iterables.filter(availableSideInputs, new Predicate<WindowedValue<?>>() {