Repository: flume Updated Branches: refs/heads/trunk 358bb6700 -> 1ca0765aa
FLUME-2955. Add file path to the header in TaildirSource Allow for adding a file path to the header dynamically. This is particularly useful when the filegroup path contains a regex expression. (tinawenqiao via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1ca0765a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1ca0765a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1ca0765a Branch: refs/heads/trunk Commit: 1ca0765aae795a41a43e39324f5f1c8bae57b751 Parents: 358bb67 Author: wenqiao <[email protected]> Authored: Wed Jul 20 11:12:40 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Jul 20 11:18:49 2016 -0700 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 3 ++ .../taildir/ReliableTaildirEventReader.java | 33 +++++++++++++++++--- .../flume/source/taildir/TaildirSource.java | 8 +++++ .../TaildirSourceConfigurationConstants.java | 8 +++++ .../flume/source/taildir/TestTaildirSource.java | 32 +++++++++++++++++-- 5 files changed, 78 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index d8bfebf..105a036 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1134,6 +1134,8 @@ cachePatternMatching true Listing direc containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity. +fileHeader false Whether to add a header storing the absolute path filename. +fileHeaderKey file Header key to use when appending absolute path filename to event header. =================================== ============================== =================================================== Example for agent named a1: @@ -1151,6 +1153,7 @@ Example for agent named a1: a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.* a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 + a1.sources.r1.fileHeader = true Twitter 1% firehose Source (experimental) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java index 1409f25..8838320 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java @@ -57,13 +57,16 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private boolean addByteOffset; private boolean cachePatternMatching; private boolean committed = true; + private final boolean annotateFileName; + private final String fileNameHeader; /** * Create a ReliableTaildirEventReader to watch the given directory. */ private ReliableTaildirEventReader(Map<String, String> filePaths, Table<String, String, String> headerTable, String positionFilePath, - boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching) throws IOException { + boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching, + boolean annotateFileName, String fileNameHeader) throws IOException { // Sanity checks Preconditions.checkNotNull(filePaths); Preconditions.checkNotNull(positionFilePath); @@ -84,6 +87,8 @@ public class ReliableTaildirEventReader implements ReliableEventReader { this.headerTable = headerTable; this.addByteOffset = addByteOffset; this.cachePatternMatching = cachePatternMatching; + this.annotateFileName = annotateFileName; + this.fileNameHeader = fileNameHeader; updateTailFiles(skipToEnd); logger.info("Updating position from position file: " + positionFilePath); @@ -193,9 +198,14 @@ public class ReliableTaildirEventReader implements ReliableEventReader { } Map<String, String> headers = currentFile.getHeaders(); - if (headers != null && !headers.isEmpty()) { + if (annotateFileName || (headers != null && !headers.isEmpty())) { for (Event event : events) { - event.getHeaders().putAll(headers); + if (headers != null && !headers.isEmpty()) { + event.getHeaders().putAll(headers); + } + if (annotateFileName) { + event.getHeaders().put(fileNameHeader, currentFile.getPath()); + } } } committed = false; @@ -287,6 +297,10 @@ public class ReliableTaildirEventReader implements ReliableEventReader { private boolean skipToEnd; private boolean addByteOffset; private boolean cachePatternMatching; + private Boolean annotateFileName = + TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER; + private String fileNameHeader = + TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY; public Builder filePaths(Map<String, String> filePaths) { this.filePaths = filePaths; @@ -318,9 +332,20 @@ public class ReliableTaildirEventReader implements ReliableEventReader { return this; } + public Builder annotateFileName(boolean annotateFileName) { + this.annotateFileName = annotateFileName; + return this; + } + + public Builder fileNameHeader(String fileNameHeader) { + this.fileNameHeader = fileNameHeader; + return this; + } + public ReliableTaildirEventReader build() throws IOException { return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd, - addByteOffset, cachePatternMatching); + addByteOffset, cachePatternMatching, + annotateFileName, fileNameHeader); } } http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index eae1b1a..a107a01 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -84,6 +84,8 @@ public class TaildirSource extends AbstractSource implements private List<Long> idleInodes = new CopyOnWriteArrayList<Long>(); private Long backoffSleepIncrement; private Long maxBackOffSleepInterval; + private boolean fileHeader; + private String fileHeaderKey; @Override public synchronized void start() { @@ -96,6 +98,8 @@ public class TaildirSource extends AbstractSource implements .skipToEnd(skipToEnd) .addByteOffset(byteOffsetHeader) .cachePatternMatching(cachePatternMatching) + .annotateFileName(fileHeader) + .fileNameHeader(fileHeaderKey) .build(); } catch (IOException e) { throw new FlumeException("Error instantiating ReliableTaildirEventReader", e); @@ -176,6 +180,10 @@ public class TaildirSource extends AbstractSource implements PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT); maxBackOffSleepInterval = context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP, PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP); + fileHeader = context.getBoolean(FILENAME_HEADER, + DEFAULT_FILE_HEADER); + fileHeaderKey = context.getString(FILENAME_HEADER_KEY, + DEFAULT_FILENAME_HEADER_KEY); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java index 2c49540..f2347f3 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -55,4 +55,12 @@ public class TaildirSourceConfigurationConstants { */ public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching"; public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true; + + /** Header in which to put absolute path filename. */ + public static final String FILENAME_HEADER_KEY = "fileHeaderKey"; + public static final String DEFAULT_FILENAME_HEADER_KEY = "file"; + + /** Whether to include absolute path filename in a header. */ + public static final String FILENAME_HEADER = "fileHeader"; + public static final boolean DEFAULT_FILE_HEADER = false; } http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java index e090b74..097ee0b 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -41,13 +41,15 @@ import java.util.ArrayList; import java.util.List; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS; -import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants - .FILE_GROUPS_PREFIX; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.HEADERS_PREFIX; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -288,4 +290,30 @@ public class TestTaildirSource { assertArrayEquals("Files not consumed in expected order", expected.toArray(), consumedOrder.toArray()); } + + @Test + public void testPutFilenameHeader() throws IOException { + File f1 = new File(tmpDir, "file1"); + Files.write("f1\n", f1, Charsets.UTF_8); + + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "fg"); + context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*"); + context.put(FILENAME_HEADER, "true"); + context.put(FILENAME_HEADER_KEY, "path"); + + Configurables.configure(source, context); + source.start(); + source.process(); + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + txn.commit(); + txn.close(); + + assertNotNull(e.getHeaders().get("path")); + assertEquals(f1.getAbsolutePath(), + e.getHeaders().get("path")); + } }
