mr-runner: handle no files case in FileSideInputReader for empty views.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ca0b15ad
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ca0b15ad
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ca0b15ad

Branch: refs/heads/mr-runner
Commit: ca0b15ada6cdbdfba9ac8adb0b8c874477587fae
Parents: 807f903
Author: Pei He <p...@apache.org>
Authored: Thu Aug 31 17:29:04 2017 +0800
Committer: Pei He <p...@apache.org>
Committed: Fri Sep 1 17:13:40 2017 +0800

----------------------------------------------------------------------
 .../mapreduce/translation/FileSideInputReader.java     | 13 +++++++------
 1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ca0b15ad/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
----------------------------------------------------------------------
diff --git 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
index cb3a8c4..403de4e 100644
--- 
a/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
+++ 
b/runners/map-reduce/src/main/java/org/apache/beam/runners/mapreduce/translation/FileSideInputReader.java
@@ -76,14 +76,15 @@ public class FileSideInputReader implements SideInputReader 
{
     try {
       FileSystem fs = pattern.getFileSystem(conf);
       FileStatus[] files = fs.globStatus(pattern);
-      // TODO: handle empty views which may result in no files case.
-      SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
files[0].getPath(), conf);
 
       List<WindowedValue<?>> availableSideInputs = new ArrayList<>();
-      BytesWritable value = new BytesWritable();
-      while (reader.next(NullWritable.get(), value)) {
-        ByteArrayInputStream inStream = new 
ByteArrayInputStream(value.getBytes());
-        availableSideInputs.add(elemCoder.decode(inStream));
+      if (files.length > 0) {
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
files[0].getPath(), conf);
+        BytesWritable value = new BytesWritable();
+        while (reader.next(NullWritable.get(), value)) {
+          ByteArrayInputStream inStream = new 
ByteArrayInputStream(value.getBytes());
+          availableSideInputs.add(elemCoder.decode(inStream));
+        }
       }
       Iterable<WindowedValue<?>> sideInputForWindow =
           Iterables.filter(availableSideInputs, new 
Predicate<WindowedValue<?>>() {

Reply via email to