Repository: flink Updated Branches: refs/heads/master 706dc131c -> b949d42d9
[FLINK-4777] catch IOException in ContinuousFileMonitoringFunction FileSystem.listStatus(path) may throw an IOException when it lists files and then retrieves their file status. This is quite common, e.g. editors which create temporary files and move them. The ContinuousFileMonitoringFunction can only apply a file path filter afterwards. The solution is to defer file checks until no exception is caught anymore. This closes #2610. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b949d42d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b949d42d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b949d42d Branch: refs/heads/master Commit: b949d42d9f16fe52126f03a960fdf23ad71e6e63 Parents: 706dc13 Author: Maximilian Michels <[email protected]> Authored: Mon Oct 10 10:06:18 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon Oct 10 10:49:49 2016 +0200 ---------------------------------------------------------------------- .../source/ContinuousFileMonitoringFunction.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b949d42d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index d36daab..f9ef565 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -244,12 +244,21 @@ public class ContinuousFileMonitoringFunction<OUT> * method to decide which parts of the file to be processed, and forward them downstream. */ private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException { - List<FileStatus> files = new ArrayList<>(); - FileStatus[] statuses = fileSystem.listStatus(new Path(path)); + final FileStatus[] statuses; + try { + statuses = fileSystem.listStatus(new Path(path)); + } catch (IOException e) { + // we may run into an IOException if files are moved while listing their status + // delay the check for eligible files in this case + return Collections.emptyList(); + } + if (statuses == null) { LOG.warn("Path does not exist: {}", path); + return Collections.emptyList(); } else { + List<FileStatus> files = new ArrayList<>(); // handle the new files for (FileStatus status : statuses) { Path filePath = status.getPath(); @@ -258,8 +267,8 @@ public class ContinuousFileMonitoringFunction<OUT> files.add(status); } } + return files; } - return files; } /**
