Repository: flume
Updated Branches:
  refs/heads/trunk 358bb6700 -> 1ca0765aa


FLUME-2955. Add file path to the header in TaildirSource

Allow for adding a file path to the header dynamically. This is
particularly useful when the filegroup path contains a regex expression.

(tinawenqiao via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1ca0765a
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1ca0765a
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1ca0765a

Branch: refs/heads/trunk
Commit: 1ca0765aae795a41a43e39324f5f1c8bae57b751
Parents: 358bb67
Author: wenqiao <[email protected]>
Authored: Wed Jul 20 11:12:40 2016 -0700
Committer: Mike Percy <[email protected]>
Committed: Wed Jul 20 11:18:49 2016 -0700

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  3 ++
 .../taildir/ReliableTaildirEventReader.java     | 33 +++++++++++++++++---
 .../flume/source/taildir/TaildirSource.java     |  8 +++++
 .../TaildirSourceConfigurationConstants.java    |  8 +++++
 .../flume/source/taildir/TestTaildirSource.java | 32 +++++++++++++++++--
 5 files changed, 78 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index d8bfebf..105a036 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1134,6 +1134,8 @@ cachePatternMatching                true                  
         Listing direc
                                                                    containing 
thousands of files. Caching the list of matching files can improve performance.
                                                                    The order 
in which files are consumed will also be cached.
                                                                    Requires 
that the file system keeps track of modification times with at least a 1-second 
granularity.
+fileHeader                          false                          Whether to 
add a header storing the absolute path filename.
+fileHeaderKey                       file                           Header key 
to use when appending absolute path filename to event header.
 =================================== ============================== 
===================================================
 
 Example for agent named a1:
@@ -1151,6 +1153,7 @@ Example for agent named a1:
   a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
   a1.sources.r1.headers.f2.headerKey1 = value2
   a1.sources.r1.headers.f2.headerKey2 = value2-2
+  a1.sources.r1.fileHeader = true
 
 Twitter 1% firehose Source (experimental)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
index 1409f25..8838320 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/ReliableTaildirEventReader.java
@@ -57,13 +57,16 @@ public class ReliableTaildirEventReader implements 
ReliableEventReader {
   private boolean addByteOffset;
   private boolean cachePatternMatching;
   private boolean committed = true;
+  private final boolean annotateFileName;
+  private final String fileNameHeader;
 
   /**
    * Create a ReliableTaildirEventReader to watch the given directory.
    */
   private ReliableTaildirEventReader(Map<String, String> filePaths,
       Table<String, String, String> headerTable, String positionFilePath,
-      boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching) 
throws IOException {
+      boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching,
+      boolean annotateFileName, String fileNameHeader) throws IOException {
     // Sanity checks
     Preconditions.checkNotNull(filePaths);
     Preconditions.checkNotNull(positionFilePath);
@@ -84,6 +87,8 @@ public class ReliableTaildirEventReader implements 
ReliableEventReader {
     this.headerTable = headerTable;
     this.addByteOffset = addByteOffset;
     this.cachePatternMatching = cachePatternMatching;
+    this.annotateFileName = annotateFileName;
+    this.fileNameHeader = fileNameHeader;
     updateTailFiles(skipToEnd);
 
     logger.info("Updating position from position file: " + positionFilePath);
@@ -193,9 +198,14 @@ public class ReliableTaildirEventReader implements 
ReliableEventReader {
     }
 
     Map<String, String> headers = currentFile.getHeaders();
-    if (headers != null && !headers.isEmpty()) {
+    if (annotateFileName || (headers != null && !headers.isEmpty())) {
       for (Event event : events) {
-        event.getHeaders().putAll(headers);
+        if (headers != null && !headers.isEmpty()) {
+          event.getHeaders().putAll(headers);
+        }
+        if (annotateFileName) {
+          event.getHeaders().put(fileNameHeader, currentFile.getPath());
+        }
       }
     }
     committed = false;
@@ -287,6 +297,10 @@ public class ReliableTaildirEventReader implements 
ReliableEventReader {
     private boolean skipToEnd;
     private boolean addByteOffset;
     private boolean cachePatternMatching;
+    private Boolean annotateFileName =
+            TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER;
+    private String fileNameHeader =
+            TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
 
     public Builder filePaths(Map<String, String> filePaths) {
       this.filePaths = filePaths;
@@ -318,9 +332,20 @@ public class ReliableTaildirEventReader implements 
ReliableEventReader {
       return this;
     }
 
+    public Builder annotateFileName(boolean annotateFileName) {
+      this.annotateFileName = annotateFileName;
+      return this;
+    }
+
+    public Builder fileNameHeader(String fileNameHeader) {
+      this.fileNameHeader = fileNameHeader;
+      return this;
+    }
+
     public ReliableTaildirEventReader build() throws IOException {
       return new ReliableTaildirEventReader(filePaths, headerTable, 
positionFilePath, skipToEnd,
-                                            addByteOffset, 
cachePatternMatching);
+                                            addByteOffset, 
cachePatternMatching,
+                                            annotateFileName, fileNameHeader);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index eae1b1a..a107a01 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -84,6 +84,8 @@ public class TaildirSource extends AbstractSource implements
   private List<Long> idleInodes = new CopyOnWriteArrayList<Long>();
   private Long backoffSleepIncrement;
   private Long maxBackOffSleepInterval;
+  private boolean fileHeader;
+  private String fileHeaderKey;
 
   @Override
   public synchronized void start() {
@@ -96,6 +98,8 @@ public class TaildirSource extends AbstractSource implements
           .skipToEnd(skipToEnd)
           .addByteOffset(byteOffsetHeader)
           .cachePatternMatching(cachePatternMatching)
+          .annotateFileName(fileHeader)
+          .fileNameHeader(fileHeaderKey)
           .build();
     } catch (IOException e) {
       throw new FlumeException("Error instantiating 
ReliableTaildirEventReader", e);
@@ -176,6 +180,10 @@ public class TaildirSource extends AbstractSource 
implements
         PollableSourceConstants.DEFAULT_BACKOFF_SLEEP_INCREMENT);
     maxBackOffSleepInterval = 
context.getLong(PollableSourceConstants.MAX_BACKOFF_SLEEP,
         PollableSourceConstants.DEFAULT_MAX_BACKOFF_SLEEP);
+    fileHeader = context.getBoolean(FILENAME_HEADER,
+            DEFAULT_FILE_HEADER);
+    fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
+            DEFAULT_FILENAME_HEADER_KEY);
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
index 2c49540..f2347f3 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
@@ -55,4 +55,12 @@ public class TaildirSourceConfigurationConstants {
    */
   public static final String CACHE_PATTERN_MATCHING = "cachePatternMatching";
   public static final boolean DEFAULT_CACHE_PATTERN_MATCHING = true;
+
+  /** Header in which to put absolute path filename. */
+  public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
+  public static final String DEFAULT_FILENAME_HEADER_KEY = "file";
+
+  /** Whether to include absolute path filename in a header. */
+  public static final String FILENAME_HEADER = "fileHeader";
+  public static final boolean DEFAULT_FILE_HEADER = false;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/1ca0765a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
index e090b74..097ee0b 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -41,13 +41,15 @@ import java.util.ArrayList;
 import java.util.List;
 
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS;
-import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants
-                  .FILE_GROUPS_PREFIX;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILE_GROUPS_PREFIX;
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.HEADERS_PREFIX;
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -288,4 +290,30 @@ public class TestTaildirSource {
     assertArrayEquals("Files not consumed in expected order", 
expected.toArray(),
                       consumedOrder.toArray());
   }
+
+  @Test
+  public void testPutFilenameHeader() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    Files.write("f1\n", f1, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "fg");
+    context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + 
"/file.*");
+    context.put(FILENAME_HEADER, "true");
+    context.put(FILENAME_HEADER_KEY, "path");
+
+    Configurables.configure(source, context);
+    source.start();
+    source.process();
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event e = channel.take();
+    txn.commit();
+    txn.close();
+
+    assertNotNull(e.getHeaders().get("path"));
+    assertEquals(f1.getAbsolutePath(),
+            e.getHeaders().get("path"));
+  }
 }

Reply via email to