Repository: flink
Updated Branches:
  refs/heads/master 706dc131c -> b949d42d9


[FLINK-4777] catch IOException in ContinuousFileMonitoringFunction

FileSystem.listStatus(path) may throw an IOException when it lists files
and then retrieves their file status. This is quite common, e.g. editors
which create temporary files and move them. The
ContinuousFileMonitoringFunction can only apply a file path filter
afterwards.

The solution is to defer file checks until no exception is caught anymore.

This closes #2610.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b949d42d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b949d42d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b949d42d

Branch: refs/heads/master
Commit: b949d42d9f16fe52126f03a960fdf23ad71e6e63
Parents: 706dc13
Author: Maximilian Michels <[email protected]>
Authored: Mon Oct 10 10:06:18 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon Oct 10 10:49:49 2016 +0200

----------------------------------------------------------------------
 .../source/ContinuousFileMonitoringFunction.java     | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b949d42d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
index d36daab..f9ef565 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
@@ -244,12 +244,21 @@ public class ContinuousFileMonitoringFunction<OUT>
         * method to decide which parts of the file to be processed, and 
forward them downstream.
         */
        private List<FileStatus> listEligibleFiles(FileSystem fileSystem) 
throws IOException {
-               List<FileStatus> files = new ArrayList<>();
 
-               FileStatus[] statuses = fileSystem.listStatus(new Path(path));
+               final FileStatus[] statuses;
+               try {
+                       statuses = fileSystem.listStatus(new Path(path));
+               } catch (IOException e) {
+                       // we may run into an IOException if files are moved 
while listing their status
+                       // delay the check for eligible files in this case
+                       return Collections.emptyList();
+               }
+
                if (statuses == null) {
                        LOG.warn("Path does not exist: {}", path);
+                       return Collections.emptyList();
                } else {
+                       List<FileStatus> files = new ArrayList<>();
                        // handle the new files
                        for (FileStatus status : statuses) {
                                Path filePath = status.getPath();
@@ -258,8 +267,8 @@ public class ContinuousFileMonitoringFunction<OUT>
                                        files.add(status);
                                }
                        }
+                       return files;
                }
-               return files;
        }
 
        /**

Reply via email to