Repository: flume Updated Branches: refs/heads/trunk 4eacba193 -> 2fe393898
FLUME-1899. Make SpoolDir work with subdirectories (Phil Scala and Bessenyei Balázs Donát via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2fe39389 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2fe39389 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2fe39389 Branch: refs/heads/trunk Commit: 2fe393898f420d100117ca277cce198858e2c24f Parents: 4eacba1 Author: Mike Percy <[email protected]> Authored: Tue Jun 28 13:44:51 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Tue Jun 28 13:44:51 2016 -0700 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 103 ++++++++---- .../flume/source/SpoolDirectorySource.java | 14 +- ...olDirectorySourceConfigurationConstants.java | 12 +- .../flume/source/TestSpoolDirectorySource.java | 155 +++++++++++++++++-- flume-ng-doc/sphinx/FlumeUserGuide.rst | 88 ++++++----- 5 files changed, 281 insertions(+), 91 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2fe39389/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index d54f415..36d80f0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -80,7 +80,6 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { .getLogger(ReliableSpoolingFileEventReader.class); static final String metaFileName = ".flumespool-main.meta"; - private final File spoolDirectory; private final String completedSuffix; private final String deserializerType; @@ -94,8 +93,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private final String deletePolicy; private final Charset inputCharset; private final DecodeErrorPolicy decodeErrorPolicy; - private final ConsumeOrder consumeOrder; - + private final ConsumeOrder consumeOrder; + private final boolean recursiveDirectorySearch; + private Optional<FileInfo> currentFile = Optional.absent(); /** Always contains the last file from which lines have been read. **/ private Optional<FileInfo> lastFileRead = Optional.absent(); @@ -114,8 +114,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { boolean annotateBaseName, String baseNameHeader, String deserializerType, Context deserializerContext, String deletePolicy, String inputCharset, - DecodeErrorPolicy decodeErrorPolicy, - ConsumeOrder consumeOrder) throws IOException { + DecodeErrorPolicy decodeErrorPolicy, + ConsumeOrder consumeOrder, + boolean recursiveDirectorySearch) throws IOException { // Sanity checks Preconditions.checkNotNull(spoolDirectory); @@ -175,7 +176,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { this.deletePolicy = deletePolicy; this.inputCharset = Charset.forName(inputCharset); this.decodeErrorPolicy = Preconditions.checkNotNull(decodeErrorPolicy); - this.consumeOrder = Preconditions.checkNotNull(consumeOrder); + this.consumeOrder = Preconditions.checkNotNull(consumeOrder); + this.recursiveDirectorySearch = recursiveDirectorySearch; File trackerDirectory = new File(trackerDirPath); @@ -199,11 +201,60 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } this.metaFile = new File(trackerDirectory, metaFileName); + if(metaFile.exists() && metaFile.length() == 0) { deleteMetaFile(); } } + /** + * Filter to exclude files/directories either hidden, finished, or names matching the ignore pattern + */ + final FileFilter filter = new FileFilter() { + public boolean accept(File candidate) { + if (candidate.isDirectory()) { + String directoryName = candidate.getName(); + if (!recursiveDirectorySearch || directoryName.startsWith(".") || + ignorePattern.matcher(directoryName).matches()) { + + return false; + } + return true; + } + String fileName = candidate.getName(); + if (fileName.endsWith(completedSuffix) || fileName.startsWith(".") || + ignorePattern.matcher(fileName).matches()) { + return false; + } + + return true; + } + }; + + /** + * Recursively gather candidate files + * @param directory the directory to gather files from + * @return list of files within the passed in directory + */ + private List<File> getCandidateFiles(File directory) { + Preconditions.checkNotNull(directory); + List<File> candidateFiles = new ArrayList<File>(); + if (!directory.isDirectory()) { + return candidateFiles; + } + + for(File file : directory.listFiles(filter)){ + if (file.isDirectory()) { + candidateFiles.addAll(getCandidateFiles(file)); + } + else { + candidateFiles.add(file); + } + } + + return candidateFiles; + } + @VisibleForTesting int getListFilesCount() { return listFilesCount; @@ -432,22 +483,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { List<File> candidateFiles = Collections.emptyList(); if (consumeOrder != ConsumeOrder.RANDOM || - candidateFileIter == null || - !candidateFileIter.hasNext()) { - /* Filter to exclude finished or hidden files */ - FileFilter filter = new FileFilter() { - public boolean accept(File candidate) { - String fileName = candidate.getName(); - if ((candidate.isDirectory()) || - (fileName.endsWith(completedSuffix)) || - (fileName.startsWith(".")) || - ignorePattern.matcher(fileName).matches()) { - return false; - } - return true; - } - }; - candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter)); + candidateFileIter == null || + !candidateFileIter.hasNext()) { + candidateFiles = getCandidateFiles(spoolDirectory); listFilesCount++; candidateFileIter = candidateFiles.iterator(); } @@ -594,9 +632,11 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private DecodeErrorPolicy decodeErrorPolicy = DecodeErrorPolicy.valueOf( SpoolDirectorySourceConfigurationConstants.DEFAULT_DECODE_ERROR_POLICY .toUpperCase(Locale.ENGLISH)); - private ConsumeOrder consumeOrder = - SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER; - + private ConsumeOrder consumeOrder = + SpoolDirectorySourceConfigurationConstants.DEFAULT_CONSUME_ORDER; + private boolean recursiveDirectorySearch = + SpoolDirectorySourceConfigurationConstants.DEFAULT_RECURSIVE_DIRECTORY_SEARCH; + public Builder spoolDirectory(File directory) { this.spoolDirectory = directory; return this; @@ -657,22 +697,27 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return this; } + public Builder recursiveDirectorySearch(boolean recursiveDirectorySearch) { + this.recursiveDirectorySearch = recursiveDirectorySearch; + return this; + } + public Builder decodeErrorPolicy(DecodeErrorPolicy decodeErrorPolicy) { this.decodeErrorPolicy = decodeErrorPolicy; return this; } - + public Builder consumeOrder(ConsumeOrder consumeOrder) { this.consumeOrder = consumeOrder; return this; - } - + } + public ReliableSpoolingFileEventReader build() throws IOException { return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix, ignorePattern, trackerDirPath, annotateFileName, fileNameHeader, annotateBaseName, baseNameHeader, deserializerType, deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy, - consumeOrder); + consumeOrder, recursiveDirectorySearch); } } http://git-wip-us.apache.org/repos/asf/flume/blob/2fe39389/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 3fe947d..3af3e53 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -70,6 +70,7 @@ Configurable, EventDrivenSource { private int maxBackoff; private ConsumeOrder consumeOrder; private int pollDelay; + private boolean recursiveDirectorySearch; @Override public synchronized void start() { @@ -95,6 +96,7 @@ Configurable, EventDrivenSource { .inputCharset(inputCharset) .decodeErrorPolicy(decodeErrorPolicy) .consumeOrder(consumeOrder) + .recursiveDirectorySearch(recursiveDirectorySearch) .build(); } catch (IOException ioe) { throw new FlumeException("Error instantiating spooling event parser", @@ -162,12 +164,15 @@ Configurable, EventDrivenSource { deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER); deserializerContext = new Context(context.getSubProperties(DESERIALIZER + ".")); - - consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER, + + consumeOrder = ConsumeOrder.valueOf(context.getString(CONSUME_ORDER, DEFAULT_CONSUME_ORDER.toString()).toUpperCase(Locale.ENGLISH)); pollDelay = context.getInteger(POLL_DELAY, DEFAULT_POLL_DELAY); + recursiveDirectorySearch = context.getBoolean(RECURSIVE_DIRECTORY_SEARCH, + DEFAULT_RECURSIVE_DIRECTORY_SEARCH); + // "Hack" to support backwards compatibility with previous generation of // spooling directory source, which did not support deserializers Integer bufferMaxLineLength = context.getInteger(BUFFER_MAX_LINE_LENGTH); @@ -210,6 +215,11 @@ Configurable, EventDrivenSource { return sourceCounter; } + @VisibleForTesting + protected boolean getRecursiveDirectorySearch() { + return recursiveDirectorySearch; + } + private class SpoolDirectoryRunnable implements Runnable { private ReliableSpoolingFileEventReader reader; private SourceCounter sourceCounter; http://git-wip-us.apache.org/repos/asf/flume/blob/2fe39389/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index 5053697..32b7837 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -92,8 +92,16 @@ public class SpoolDirectorySourceConfigurationConstants { OLDEST, YOUNGEST, RANDOM } public static final String CONSUME_ORDER = "consumeOrder"; - public static final ConsumeOrder DEFAULT_CONSUME_ORDER = ConsumeOrder.OLDEST; - + public static final ConsumeOrder DEFAULT_CONSUME_ORDER = ConsumeOrder.OLDEST; + + /** + * Flag to indicate if we should recursively checking for new files. The + * default is false, so a configuration file entry would be needed to enable + * this setting + */ + public static final String RECURSIVE_DIRECTORY_SEARCH = "recursiveDirectorySearch"; + public static final boolean DEFAULT_RECURSIVE_DIRECTORY_SEARCH = false; + /** Delay(in milliseconds) used when polling for new files. The default is 500ms */ public static final String POLL_DELAY = "pollDelay"; public static final int DEFAULT_POLL_DELAY = 500; http://git-wip-us.apache.org/repos/asf/flume/blob/2fe39389/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index fe530ff..47fdc7a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -17,15 +17,16 @@ package org.apache.flume.source; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; import org.apache.flume.Channel; -import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; @@ -34,7 +35,6 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; -import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleState; import org.junit.After; @@ -69,34 +69,47 @@ public class TestSpoolDirectorySource { @After public void tearDown() { - for (File f : tmpDir.listFiles()) { - f.delete(); - } + deleteFiles(tmpDir); tmpDir.delete(); } + /** + * Helper method to recursively clean up testing directory + * @param directory the directory to clean up + */ + private void deleteFiles(File directory) { + for (File f : directory.listFiles()) { + if (f.isDirectory()) { + deleteFiles(f); + f.delete(); + } else { + f.delete(); + } + } + } + @Test (expected = IllegalArgumentException.class) public void testInvalidSortOrder() { Context context = new Context(); - context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); - context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, + context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, "undefined"); - Configurables.configure(source, context); + Configurables.configure(source, context); } - + @Test public void testValidSortOrder() { Context context = new Context(); - context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, "oLdESt"); Configurables.configure(source, context); - context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, + context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, "yoUnGest"); Configurables.configure(source, context); - context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, + context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, "rAnDom"); Configurables.configure(source, context); } @@ -134,6 +147,9 @@ public class TestSpoolDirectorySource { txn.close(); } + /** + * Tests if SpoolDirectorySource sets basename headers on events correctly + */ @Test public void testPutBasenameHeader() throws IOException, InterruptedException { @@ -168,14 +184,121 @@ public class TestSpoolDirectorySource { txn.close(); } + /** + * Tests SpoolDirectorySource with parameter recursion set to true + */ + @Test + public void testRecursion_SetToTrue() throws IOException, InterruptedException { + File subDir = new File(tmpDir, "directorya/directoryb/directoryc"); + boolean directoriesCreated = subDir.mkdirs(); + Assert.assertTrue("source directories must be created", directoriesCreated); + + final String FILE_NAME = "recursion_file.txt"; + File f1 = new File(subDir, FILE_NAME); + String origBody = "file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n"; + Files.write(origBody, f1, Charsets.UTF_8); + + Context context = new Context(); + context.put(SpoolDirectorySourceConfigurationConstants.RECURSIVE_DIRECTORY_SEARCH, + "true"); // enable recursion, so we should find the file we created above + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); // spool set to root dir + context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER, + "true"); // put the file name in the "file" header + + Configurables.configure(source, context); + source.start(); + Assert.assertTrue("Recursion setting in source is correct", + source.getRecursiveDirectorySearch()); + + + Transaction txn = channel.getTransaction(); + txn.begin(); + long startTime = System.currentTimeMillis(); + Event e = null; + while (System.currentTimeMillis() - startTime < 300 && e == null) { + e = channel.take(); + Thread.sleep(10); + } + + Assert.assertNotNull("Event must not be null", e); + + Assert.assertNotNull("Event headers must not be null", e.getHeaders()); + Assert.assertTrue("File header value did not end with expected filename", + e.getHeaders().get("file").endsWith(FILE_NAME)); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + do { // collecting the whole body + baos.write(e.getBody()); + baos.write('\n'); // newline characters are consumed in the process + e = channel.take(); + } while(e != null); + + Assert.assertEquals("Event body is correct", + Arrays.toString(origBody.getBytes()), + Arrays.toString(baos.toByteArray())); + txn.commit(); + txn.close(); + } + + + /** + * This test will place a file into a sub-directory of the spool directory + * since the recursion setting is false there should not be any transactions + * to take from the channel. The 500 ms is arbitrary and simply follows + * what the other tests use to "assume" that since there is no data then this worked. + */ + @Test + public void testRecursion_SetToFalse() throws IOException, InterruptedException { + Context context = new Context(); + + File subDir = new File(tmpDir, "directory"); + boolean directoriesCreated = subDir.mkdirs(); + Assert.assertTrue("source directories must be created", directoriesCreated); + + + + File f1 = new File(subDir.getAbsolutePath() + "/file1.txt"); + + Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", f1, Charsets.UTF_8); + + context.put(SpoolDirectorySourceConfigurationConstants.RECURSIVE_DIRECTORY_SEARCH, + "false"); + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER, + "true"); + context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER_KEY, + "fileHeaderKeyTest"); + + Configurables.configure(source, context); + source.start(); + // check the source to ensure the setting has been set via the context object + Assert.assertFalse("Recursion setting in source is not set to false (this" + + "test does not want recursion enabled)", source.getRecursiveDirectorySearch()); + + Transaction txn = channel.getTransaction(); + txn.begin(); + long startTime = System.currentTimeMillis(); + Event e = null; + while (System.currentTimeMillis() - startTime < 300 && e == null) { + e = channel.take(); + Thread.sleep(10); + } + Assert.assertNull("Event must be null", e); + txn.commit(); + txn.close(); + } + @Test public void testLifecycle() throws IOException, InterruptedException { Context context = new Context(); File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", - f1, Charsets.UTF_8); + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); @@ -243,8 +366,8 @@ public class TestSpoolDirectorySource { File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", - f1, Charsets.UTF_8); + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, http://git-wip-us.apache.org/repos/asf/flume/blob/2fe39389/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 287d066..c4d7c6c 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -976,49 +976,53 @@ Despite the reliability guarantees of this source, there are still cases in which events may be duplicated if certain downstream failures occur. This is consistent with the guarantees offered by other Flume components. -==================== ============== ========================================================== -Property Name Default Description -==================== ============== ========================================================== -**channels** -- -**type** -- The component type name, needs to be ``spooldir``. -**spoolDir** -- The directory from which to read files from. -fileSuffix .COMPLETED Suffix to append to completely ingested files -deletePolicy never When to delete completed files: ``never`` or ``immediate`` -fileHeader false Whether to add a header storing the absolute path filename. -fileHeaderKey file Header key to use when appending absolute path filename to event header. -basenameHeader false Whether to add a header storing the basename of the file. -basenameHeaderKey basename Header Key to use when appending basename of file to event header. -ignorePattern ^$ Regular expression specifying which files to ignore (skip) -trackerDir .flumespool Directory to store metadata related to processing of files. - If this path is not an absolute path, then it is interpreted as relative to the spoolDir. -consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``, - ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified - time of the files will be used to compare the files. In case of a tie, the file - with smallest lexicographical order will be consumed first. In case of ``random`` any - file will be picked randomly. When using ``oldest`` and ``youngest`` the whole - directory will be scanned to pick the oldest/youngest file, which might be slow if there - are a large number of files, while using ``random`` may cause old files to be consumed - very late if new files keep coming in the spooling directory. -pollDelay 500 Delay (in milliseconds) used when polling for new files. -maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. -batchSize 100 Granularity at which to batch transfer to the channel -inputCharset UTF-8 Character set used by deserializers that treat the input file as text. -decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file. - ``FAIL``: Throw an exception and fail to parse the file. - ``REPLACE``: Replace the unparseable character with the "replacement character" char, - typically Unicode U+FFFD. - ``IGNORE``: Drop the unparseable character sequence. -deserializer ``LINE`` Specify the deserializer used to parse the file into events. - Defaults to parsing each line as an event. The class specified must implement - ``EventDeserializer.Builder``. -deserializer.* Varies per event deserializer. -bufferMaxLines -- (Obselete) This option is now ignored. -bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. -selector.type replicating replicating or multiplexing -selector.* Depends on the selector.type value -interceptors -- Space-separated list of interceptors +====================== ============== ========================================================== +Property Name Default Description +====================== ============== ========================================================== +**channels** -- +**type** -- The component type name, needs to be ``spooldir``. +**spoolDir** -- The directory from which to read files from. +fileSuffix .COMPLETED Suffix to append to completely ingested files +deletePolicy never When to delete completed files: ``never`` or ``immediate`` +fileHeader false Whether to add a header storing the absolute path filename. +fileHeaderKey file Header key to use when appending absolute path filename to event header. +basenameHeader false Whether to add a header storing the basename of the file. +basenameHeaderKey basename Header Key to use when appending basename of file to event header. +ignorePattern ^$ Regular expression specifying which files to ignore (skip) +trackerDir .flumespool Directory to store metadata related to processing of files. + If this path is not an absolute path, then it is interpreted as relative to the spoolDir. +consumeOrder oldest In which order files in the spooling directory will be consumed ``oldest``, + ``youngest`` and ``random``. In case of ``oldest`` and ``youngest``, the last modified + time of the files will be used to compare the files. In case of a tie, the file + with smallest lexicographical order will be consumed first. In case of ``random`` any + file will be picked randomly. When using ``oldest`` and ``youngest`` the whole + directory will be scanned to pick the oldest/youngest file, which might be slow if there + are a large number of files, while using ``random`` may cause old files to be consumed + very late if new files keep coming in the spooling directory. +pollDelay 500 Delay (in milliseconds) used when polling for new files. +recursiveDirectorySearch false Whether to monitor sub directories for new files to read. +maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to + write to the channel(s) if the channel is full. The source will start at + a low backoff and increase it exponentially each time the channel throws a + ChannelException, upto the value specified by this parameter. +batchSize 100 Granularity at which to batch transfer to the channel +inputCharset UTF-8 Character set used by deserializers that treat the input file as text. +decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file. + ``FAIL``: Throw an exception and fail to parse the file. + ``REPLACE``: Replace the unparseable character with the "replacement character" char, + typically Unicode U+FFFD. + ``IGNORE``: Drop the unparseable character sequence. +deserializer ``LINE`` Specify the deserializer used to parse the file into events. + Defaults to parsing each line as an event. The class specified must implement + ``EventDeserializer.Builder``. +deserializer.* Varies per event deserializer. +bufferMaxLines -- (Obselete) This option is now ignored. +bufferMaxLineLength 5000 (Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead. +selector.type replicating replicating or multiplexing +selector.* Depends on the selector.type value +interceptors -- Space-separated list of interceptors interceptors.* -==================== ============== ========================================================== +======================= ============== ========================================================== Example for an agent named agent-1:
