Repository: flume Updated Branches: refs/heads/trunk 4e06f6fe7 -> 4d2a34e93
FLUME-1934. Spooling Directory Source dies on encountering zero-byte files. (Grant Henke via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4d2a34e9 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4d2a34e9 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4d2a34e9 Branch: refs/heads/trunk Commit: 4d2a34e931554baa1c1b255d95540a46354a521f Parents: 4e06f6f Author: Hari Shreedharan <[email protected]> Authored: Mon Apr 6 18:14:04 2015 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Apr 6 18:14:04 2015 -0700 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 6 ++-- .../flume/source/TestSpoolDirectorySource.java | 30 ++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4d2a34e9/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index 27e9c1e..d54f415 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -252,8 +252,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { List<Event> events = des.readEvents(numEvents); /* It's possible that the last read took us just up to a file boundary. - * If so, try to roll to the next file, if there is one. */ - if (events.isEmpty()) { + * If so, try to roll to the next file, if there is one. + * Loop until events is not empty or there is no next file in case of 0 byte files */ + while (events.isEmpty()) { + logger.info("Last read took us just up to a file boundary. Rolling to the next file, if there is one."); retireCurrentFile(); currentFile = getNextFile(); if (!currentFile.isPresent()) { http://git-wip-us.apache.org/repos/asf/flume/blob/4d2a34e9/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 89e7c8c..fe530ff 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -283,4 +283,34 @@ public class TestSpoolDirectorySource { Assert.assertEquals(8, dataOut.size()); source.stop(); } + + @Test + public void testEndWithZeroByteFiles() throws IOException, InterruptedException { + Context context = new Context(); + + File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); + + Files.write("file1line1\n", f1, Charsets.UTF_8); + + File f2 = new File(tmpDir.getAbsolutePath() + "/file2"); + File f3 = new File(tmpDir.getAbsolutePath() + "/file3"); + File f4 = new File(tmpDir.getAbsolutePath() + "/file4"); + + Files.touch(f2); + Files.touch(f3); + Files.touch(f4); + + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + Configurables.configure(source, context); + source.start(); + + // Need better way to ensure all files were processed. + Thread.sleep(5000); + + Assert.assertFalse("Server did not error", source.hasFatalError()); + Assert.assertEquals("One message was read", 1, + source.getSourceCounter().getEventAcceptedCount()); + source.stop(); + } }
