Repository: flume Updated Branches: refs/heads/trunk a9a775a9c -> b6dede8c1 (forced update)
FLUME-2318: Make SpoolingDirectorySource able to handle empty files (Muhammad Ehsan ul Haque and Bessenyei Balázs Donát via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b6dede8c Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b6dede8c Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b6dede8c Branch: refs/heads/trunk Commit: b6dede8c108e0d8e3516767de31e3a847c69cfe0 Parents: c1fae53 Author: Bessenyei Balázs Donát <[email protected]> Authored: Wed Aug 17 09:52:21 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Aug 17 10:35:38 2016 -0700 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 73 +++++++++++------ .../flume/source/TestSpoolDirectorySource.java | 83 +++++++++++++++++--- 2 files changed, 120 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b6dede8c/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index 01381a5..a0f929c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -28,6 +28,7 @@ import org.apache.flume.Event; import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; +import org.apache.flume.event.EventBuilder; import org.apache.flume.serialization.DecodeErrorPolicy; import org.apache.flume.serialization.DurablePositionTracker; import org.apache.flume.serialization.EventDeserializer; @@ -58,20 +59,18 @@ import java.util.Locale; import java.util.regex.Pattern; /** - * <p/>A {@link ReliableEventReader} which reads log data from files stored + * <p>A {@link ReliableEventReader} which reads log data from files stored * in a spooling directory and renames each file once all of its data has been * read (through {@link EventDeserializer#readEvent()} calls). The user must * {@link #commit()} each read, to indicate that the lines have been fully * processed. - * <p/>Read calls will return no data if there are no files left to read. This + * <p>Read calls will return no data if there are no files left to read. This * class, in general, is not thread safe. - * - * <p/>This reader assumes that files with unique file names are left in the + * <p>This reader assumes that files with unique file names are left in the * spooling directory and not modified once they are placed there. Any user * behavior which violates these assumptions, when detected, will result in a * FlumeException being thrown. - * - * <p/>This class makes the following guarantees, if above assumptions are met: + * <p>This class makes the following guarantees, if above assumptions are met: * <ul> * <li> Once a log file has been renamed with the {@link #completedSuffix}, * all of its records have been read through the @@ -106,11 +105,12 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private final boolean recursiveDirectorySearch; private Optional<FileInfo> currentFile = Optional.absent(); - /** Always contains the last file from which lines have been read. **/ + /** Always contains the last file from which lines have been read. */ private Optional<FileInfo> lastFileRead = Optional.absent(); private boolean committed = true; + private boolean firstTimeRead = true; - /** Instance var to Cache directory listing **/ + /** Instance var to Cache directory listing */ private Iterator<File> candidateFileIter = null; private int listFilesCount = 0; @@ -220,6 +220,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { /** * Recursively gather candidate files + * * @param directory the directory to gather files from * @return list of files within the passed in directory */ @@ -269,9 +270,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return listFilesCount; } - /** Return the filename which generated the data from the last successful + /** + * Return the filename which generated the data from the last successful * {@link #readEvents(int)} call. Returns null if called before any file - * contents are read. */ + * contents are read. + */ public String getLastFileRead() { if (!lastFileRead.isPresent()) { return null; @@ -308,8 +311,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } - EventDeserializer des = currentFile.get().getDeserializer(); - List<Event> events = des.readEvents(numEvents); + List<Event> events = readDeserializerEvents(numEvents); /* It's possible that the last read took us just up to a file boundary. * If so, try to roll to the next file, if there is one. @@ -322,9 +324,27 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { if (!currentFile.isPresent()) { return Collections.emptyList(); } - events = currentFile.get().getDeserializer().readEvents(numEvents); + events = readDeserializerEvents(numEvents); + } + + fillHeader(events); + + committed = false; + lastFileRead = currentFile; + return events; + } + + private List<Event> readDeserializerEvents(int numEvents) throws IOException { + EventDeserializer des = currentFile.get().getDeserializer(); + List<Event> events = des.readEvents(numEvents); + if (events.isEmpty() && firstTimeRead) { + events.add(EventBuilder.withBody(new byte[0])); } + firstTimeRead = false; + return events; + } + private void fillHeader(List<Event> events) { if (annotateFileName) { String filename = currentFile.get().getFile().getAbsolutePath(); for (Event event : events) { @@ -338,10 +358,6 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { event.getHeaders().put(baseNameHeader, basename); } } - - committed = false; - lastFileRead = currentFile; - return events; } @Override @@ -352,7 +368,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } - /** Commit the last lines which were read. */ + /** + * Commit the last lines which were read. + */ @Override public void commit() throws IOException { if (!committed && currentFile.isPresent()) { @@ -363,11 +381,12 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { /** * Closes currentFile and attempt to rename it. - * + * <p> * If these operations fail in a way that may cause duplicate log entries, * an error is logged but no exceptions are thrown. If these operations fail * in a way that indicates potential misuse of the spooling directory, a * FlumeException will be thrown. + * * @throws FlumeException if files do not conform to spooling assumptions */ private void retireCurrentFile() throws IOException { @@ -400,6 +419,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { /** * Rename the given spooled file + * * @param fileToRoll * @throws IOException */ @@ -432,13 +452,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { throw new IllegalStateException(message); } - // Dest file exists and not on windows + // Dest file exists and not on windows } else if (dest.exists()) { String message = "File name has been re-used with different" + " files. Spooling assumptions violated for " + dest; throw new IllegalStateException(message); - // Destination file does not already exist. We are good to go! + // Destination file does not already exist. We are good to go! } else { boolean renamed = fileToRoll.renameTo(dest); if (renamed) { @@ -460,6 +480,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { /** * Delete the given spooled file + * * @param fileToDelete * @throws IOException */ @@ -508,7 +529,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random. return openFile(selectedFile); } else if (consumeOrder == ConsumeOrder.YOUNGEST) { - for (File candidateFile: candidateFiles) { + for (File candidateFile : candidateFiles) { long compare = selectedFile.lastModified() - candidateFile.lastModified(); if (compare == 0) { // ts is same pick smallest lexicographically. @@ -518,7 +539,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } } else { // default order is OLDEST - for (File candidateFile: candidateFiles) { + for (File candidateFile : candidateFiles) { long compare = selectedFile.lastModified() - candidateFile.lastModified(); if (compare == 0) { // ts is same pick smallest lexicographically. @@ -529,6 +550,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } + firstTimeRead = true; + return openFile(selectedFile); } @@ -538,13 +561,15 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } return f2; } + /** * Opens a file for consuming + * * @param file * @return {@link FileInfo} for the file to consume or absent option if the * file does not exists or readable. */ - private Optional<FileInfo> openFile(File file) { + private Optional<FileInfo> openFile(File file) { try { // roll the meta file, if needed String nextPath = file.getPath(); http://git-wip-us.apache.org/repos/asf/flume/blob/b6dede8c/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 0182d21..92a698d 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -120,8 +120,8 @@ public class TestSpoolDirectorySource { File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", - f1, Charsets.UTF_8); + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); @@ -156,8 +156,8 @@ public class TestSpoolDirectorySource { File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", - f1, Charsets.UTF_8); + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); @@ -256,7 +256,6 @@ public class TestSpoolDirectorySource { boolean directoriesCreated = subDir.mkdirs(); Assert.assertTrue("source directories must be created", directoriesCreated); - File f1 = new File(subDir.getAbsolutePath() + "/file1.txt"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + @@ -364,9 +363,8 @@ public class TestSpoolDirectorySource { File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", - f1, Charsets.UTF_8); - + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); @@ -384,8 +382,7 @@ public class TestSpoolDirectorySource { } Assert.assertTrue("Expected to hit ChannelFullException, but did not!", - source.didHitChannelFullException()); - + source.didHitChannelFullException()); List<String> dataOut = Lists.newArrayList(); @@ -434,8 +431,70 @@ public class TestSpoolDirectorySource { Thread.sleep(5000); Assert.assertFalse("Server did not error", source.hasFatalError()); - Assert.assertEquals("One message was read", - 1, source.getSourceCounter().getEventAcceptedCount()); + Assert.assertEquals("Four messages were read", + 4, source.getSourceCounter().getEventAcceptedCount()); + source.stop(); + } + + @Test + public void testWithAllEmptyFiles() + throws InterruptedException, IOException { + Context context = new Context(); + File[] f = new File[10]; + for (int i = 0; i < 10; i++) { + f[i] = new File(tmpDir.getAbsolutePath() + "/file" + i); + Files.write(new byte[0], f[i]); + } + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER, + "true"); + context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY, + "fileHeaderKeyTest"); + Configurables.configure(source, context); + source.start(); + Thread.sleep(10); + for (int i = 0; i < 10; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + Assert.assertNotNull("Event must not be null", e); + Assert.assertNotNull("Event headers must not be null", e.getHeaders()); + Assert.assertNotNull(e.getHeaders().get("fileHeaderKeyTest")); + Assert.assertEquals(f[i].getAbsolutePath(), + e.getHeaders().get("fileHeaderKeyTest")); + Assert.assertArrayEquals(new byte[0], e.getBody()); + txn.commit(); + txn.close(); + } + source.stop(); + } + + @Test + public void testWithEmptyAndDataFiles() + throws InterruptedException, IOException { + Context context = new Context(); + File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); + Files.write("some data".getBytes(), f1); + File f2 = new File(tmpDir.getAbsolutePath() + "/file2"); + Files.write(new byte[0], f2); + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + Configurables.configure(source, context); + source.start(); + Thread.sleep(10); + for (int i = 0; i < 2; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + txn.commit(); + txn.close(); + } + Transaction txn = channel.getTransaction(); + txn.begin(); + Assert.assertNull(channel.take()); + txn.commit(); + txn.close(); source.stop(); } }
