Repository: flume Updated Branches: refs/heads/trunk 30f6e39aa -> 61f43aab5
FLUME-3222 Fix for NoSuchFileException thrown when files are being deleted from the TAILDIR source We fetch file names from a directory and later we fetch inodes. If there is a delete between these operations this problem occurs. Reproduced from unit test. Added exception handling to handle this case. It is enough to ignore the NoSuchFileException and continue. reviewers: Ferenc Szabo, Peter Turcsanyi (Endre Major via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/61f43aab Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/61f43aab Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/61f43aab Branch: refs/heads/trunk Commit: 61f43aab5d240db07ca84d545e9e6c95334df435 Parents: 30f6e39 Author: Ferenc Szabo <[email protected]> Authored: Tue Jun 5 10:29:11 2018 +0200 Committer: Ferenc Szabo <[email protected]> Committed: Tue Jun 5 10:29:11 2018 +0200 ---------------------------------------------------------------------- .../taildir/ReliableTaildirEventReader.java | 9 +++++- .../source/taildir/TestTaildirEventReader.java | 30 ++++++++++++++++---- 2 files changed, 33 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/61f43aab/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 633d3c1..16c9b17 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -38,6 +38,7 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -242,7 +243,13 @@ public class ReliableTaildirEventReader implements ReliableEventReader { Map<String, String> headers = headerTable.row(taildir.getFileGroup()); for (File f : taildir.getMatchingFiles()) { - long inode = getInode(f); + long inode; + try { + inode = getInode(f); + } catch (NoSuchFileException e) { + logger.info("File has been deleted in the meantime: " + e.getMessage()); + continue; + } TailFile tf = tailFiles.get(inode); if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) { long startPos = skipToEnd ? f.length() : 0; http://git-wip-us.apache.org/repos/asf/flume/blob/61f43aab/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java index e75543c..0dd8eb9 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java @@ -69,7 +69,8 @@ public class TestTaildirEventReader { } private ReliableTaildirEventReader getReader(Map<String, String> filePaths, - Table<String, String, String> headerTable, boolean addByteOffset) { + Table<String, String, String> headerTable, boolean addByteOffset, + boolean cachedPatternMatching) { ReliableTaildirEventReader reader; try { reader = new ReliableTaildirEventReader.Builder() @@ -78,6 +79,7 @@ public class TestTaildirEventReader { .positionFilePath(posFilePath) .skipToEnd(false) .addByteOffset(addByteOffset) + .cachePatternMatching(cachedPatternMatching) .build(); reader.updateTailFiles(); } catch (IOException ioe) { @@ -86,15 +88,16 @@ public class TestTaildirEventReader { return reader; } - private ReliableTaildirEventReader getReader(boolean addByteOffset) { + private ReliableTaildirEventReader getReader(boolean addByteOffset, + boolean cachedPatternMatching) { Map<String, String> filePaths = ImmutableMap.of("testFiles", tmpDir.getAbsolutePath() + "/file.*"); Table<String, String, String> headerTable = HashBasedTable.create(); - return getReader(filePaths, headerTable, addByteOffset); + return getReader(filePaths, headerTable, addByteOffset, cachedPatternMatching); } private ReliableTaildirEventReader getReader() { - return getReader(false); + return getReader(false, false); } @Before @@ -157,6 +160,23 @@ public class TestTaildirEventReader { } @Test + // Tests deleting a file + public void testDeleteFiles() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("file1line1\nfile1line2\n", f1, Charsets.UTF_8); + + // Caching is used to be able to reproduce the problem when a file is deleted + // right before the inode is fetched + ReliableTaildirEventReader reader = getReader(false, true); + + File dir = f1.getParentFile(); + long lastModified = dir.lastModified(); + f1.delete(); + dir.setLastModified(lastModified - 1000); //substract a second to be sure the cache is used + reader.updateTailFiles(); + } + + @Test // Make sure this works when there are initially no files // and we finish reading all files and fully commit. public void testInitiallyEmptyDirAndBehaviorAfterReadingAll() throws IOException { @@ -459,7 +479,7 @@ public class TestTaildirEventReader { String line3 = "file1line3\n"; Files.write(line1 + line2 + line3, f1, Charsets.UTF_8); - ReliableTaildirEventReader reader = getReader(true); + ReliableTaildirEventReader reader = getReader(true, false); List<String> headers = null; for (TailFile tf : reader.getTailFiles().values()) { headers = headersAsStrings(reader.readEvents(tf, 5), BYTE_OFFSET_HEADER_KEY);
