Repository: flink Updated Branches: refs/heads/release-1.1 bab59dfa7 -> 7267562bb
[FLINK-4777] catch IOException in ContinuousFileMonitoringFunction FileSystem.listStatus(path) may throw an IOException when it lists files and then retrieves their file status. This is quite common, e.g. editors which create temporary files and move them. The ContinuousFileMonitoringFunction can only apply a file path filter afterwards. The solution is to defer file checks until no exception is caught anymore. This closes #2610. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7267562b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7267562b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7267562b Branch: refs/heads/release-1.1 Commit: 7267562bb8110e3f7300007e996ce96355d37c59 Parents: bab59df Author: Maximilian Michels <[email protected]> Authored: Fri Oct 7 20:06:18 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon Oct 10 10:46:41 2016 +0200 ---------------------------------------------------------------------- .../source/ContinuousFileMonitoringFunction.java | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7267562b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 8ff4a2a..4b2fbe1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -246,12 +246,21 @@ public class ContinuousFileMonitoringFunction<OUT> * method to decide which parts of the file to be processed, and forward them downstream. */ private List<FileStatus> listEligibleFiles(FileSystem fileSystem) throws IOException { - List<FileStatus> files = new ArrayList<>(); - FileStatus[] statuses = fileSystem.listStatus(new Path(path)); + final FileStatus[] statuses; + try { + statuses = fileSystem.listStatus(new Path(path)); + } catch (IOException e) { + // we may run into an IOException if files are moved while listing their status + // delay the check for eligible files in this case + return Collections.emptyList(); + } + if (statuses == null) { LOG.warn("Path does not exist: {}", path); + return Collections.emptyList(); } else { + List<FileStatus> files = new ArrayList<>(); // handle the new files for (FileStatus status : statuses) { Path filePath = status.getPath(); @@ -260,8 +269,8 @@ public class ContinuousFileMonitoringFunction<OUT> files.add(status); } } + return files; } - return files; } /**
