Repository: flume Updated Branches: refs/heads/trunk c718dae09 -> dfa062757
FLUME-3083. Check byte position of file in update condition of Taildir Source This patch addresses an edge case of the Taildir Source wherein it can miss reading events written in the same second as the file closing. This closes #128 Reviewers: Satoshi Iijima, Bessenyei Balázs Donát (eskrm via Bessenyei Balázs Donát) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/dfa06275 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/dfa06275 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/dfa06275 Branch: refs/heads/trunk Commit: dfa0627573b9a75a25dc7149a7d63c9bac953ff4 Parents: c718dae Author: eskrm <[email protected]> Authored: Sun Apr 9 13:02:54 2017 -0700 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Thu Apr 27 19:03:46 2017 +0000 ---------------------------------------------------------------------- .../taildir/ReliableTaildirEventReader.java | 2 +- .../source/taildir/TestTaildirEventReader.java | 25 ++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/dfa06275/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 8838320..633d3c1 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -248,7 +248,7 @@ public class ReliableTaildirEventReader implements ReliableEventReader { long startPos = skipToEnd ? f.length() : 0; tf = openFile(f, headers, inode, startPos); } else { - boolean updated = tf.getLastUpdated() < f.lastModified(); + boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length(); if (updated) { if (tf.getRaf() == null) { tf = openFile(f, headers, inode, tf.getPos()); http://git-wip-us.apache.org/repos/asf/flume/blob/dfa06275/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java index bcfe4bb..e75543c 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirEventReader.java @@ -493,4 +493,29 @@ public class TestTaildirEventReader { assertTrue(out.contains("file1line3")); assertTrue(out.contains("file1line4")); } + + @Test + // Ensure tail file is set to be read when its last updated time + // equals the underlying file's modification time and there are + // pending bytes to be read. + public void testUpdateWhenLastUpdatedSameAsModificationTime() throws IOException { + File file = new File(tmpDir, "file"); + Files.write("line1\n", file, Charsets.UTF_8); + + ReliableTaildirEventReader reader = getReader(); + for (TailFile tf : reader.getTailFiles().values()) { + reader.readEvents(tf, 1); + reader.commit(); + } + + Files.append("line2\n", file, Charsets.UTF_8); + for (TailFile tf : reader.getTailFiles().values()) { + tf.setLastUpdated(file.lastModified()); + } + + reader.updateTailFiles(); + for (TailFile tf : reader.getTailFiles().values()) { + assertEquals(true, tf.needTail()); + } + } }
