Updated Branches:
  refs/heads/trunk 0d0437347 -> 72b3b7889

FLUME-2056. Allow SpoolDir to pass just the filename that is the source of an 
event

(Jeff Lord via Mike Percy)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/72b3b788
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/72b3b788
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/72b3b788

Branch: refs/heads/trunk
Commit: 72b3b7889f1dcf6736f099b77c878efe891c6585
Parents: 0d04373
Author: Mike Percy <[email protected]>
Authored: Mon Dec 16 14:59:14 2013 -0800
Committer: Mike Percy <[email protected]>
Committed: Mon Dec 16 14:59:14 2013 -0800

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   | 30 ++++++++++++++--
 .../flume/source/SpoolDirectorySource.java      |  8 +++++
 ...olDirectorySourceConfigurationConstants.java | 12 +++++--
 .../flume/source/TestSpoolDirectorySource.java  | 38 +++++++++++++++++++-
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  6 ++--
 5 files changed, 87 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/72b3b788/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index bd684ed..a88ed6e 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -92,7 +92,9 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
   private final Pattern ignorePattern;
   private final File metaFile;
   private final boolean annotateFileName;
+  private final boolean annotateBaseName;
   private final String fileNameHeader;
+  private final String baseNameHeader;
   private final String deletePolicy;
   private final Charset inputCharset;
   private final DecodeErrorPolicy decodeErrorPolicy;
@@ -108,6 +110,7 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
   private ReliableSpoolingFileEventReader(File spoolDirectory,
       String completedSuffix, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
+      boolean annotateBaseName, String baseNameHeader,
       String deserializerType, Context deserializerContext,
       String deletePolicy, String inputCharset,
       DecodeErrorPolicy decodeErrorPolicy) throws IOException {
@@ -164,6 +167,8 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
     this.deserializerContext = deserializerContext;
     this.annotateFileName = annotateFileName;
     this.fileNameHeader = fileNameHeader;
+    this.annotateBaseName = annotateBaseName;
+    this.baseNameHeader = baseNameHeader;
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
     this.inputCharset = Charset.forName(inputCharset);
@@ -253,6 +258,13 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
       }
     }
 
+    if (annotateBaseName) {
+      String basename = currentFile.get().getFile().getName();
+      for (Event event : events) {
+        event.getHeaders().put(baseNameHeader, basename);
+      }
+    }
+
     committed = false;
     lastFileRead = currentFile;
     return events;
@@ -510,6 +522,10 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
         SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
     private String fileNameHeader =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
+    private Boolean annotateBaseName =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER;
+    private String baseNameHeader =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER_KEY;
     private String deserializerType =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER;
     private Context deserializerContext = new Context();
@@ -551,6 +567,16 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
       return this;
     }
 
+    public Builder annotateBaseName(Boolean annotateBaseName) {
+      this.annotateBaseName = annotateBaseName;
+      return this;
+    }
+
+    public Builder baseNameHeader(String baseNameHeader) {
+      this.baseNameHeader = baseNameHeader;
+      return this;
+    }
+
     public Builder deserializerType(String deserializerType) {
       this.deserializerType = deserializerType;
       return this;
@@ -579,8 +605,8 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, 
completedSuffix,
           ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
-          deserializerType, deserializerContext, deletePolicy, inputCharset,
-          decodeErrorPolicy);
+          annotateBaseName, baseNameHeader, deserializerType,
+          deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/72b3b788/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 0160215..f42ed2d 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -54,6 +54,8 @@ Configurable, EventDrivenSource {
   private String spoolDirectory;
   private boolean fileHeader;
   private String fileHeaderKey;
+  private boolean basenameHeader;
+  private String basenameHeaderKey;
   private int batchSize;
   private String ignorePattern;
   private String trackerDirPath;
@@ -87,6 +89,8 @@ Configurable, EventDrivenSource {
           .trackerDirPath(trackerDirPath)
           .annotateFileName(fileHeader)
           .fileNameHeader(fileHeaderKey)
+          .annotateBaseName(basenameHeader)
+          .baseNameHeader(basenameHeaderKey)
           .deserializerType(deserializerType)
           .deserializerContext(deserializerContext)
           .deletePolicy(deletePolicy)
@@ -142,6 +146,10 @@ Configurable, EventDrivenSource {
         DEFAULT_FILE_HEADER);
     fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
         DEFAULT_FILENAME_HEADER_KEY);
+    basenameHeader = context.getBoolean(BASENAME_HEADER,
+        DEFAULT_BASENAME_HEADER);
+    basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
+        DEFAULT_BASENAME_HEADER_KEY);
     batchSize = context.getInteger(BATCH_SIZE,
         DEFAULT_BATCH_SIZE);
     inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);

http://git-wip-us.apache.org/repos/asf/flume/blob/72b3b788/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index a2befe8..83522c0 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -27,14 +27,22 @@ public class SpoolDirectorySourceConfigurationConstants {
   public static final String SPOOLED_FILE_SUFFIX = "fileSuffix";
   public static final String DEFAULT_SPOOLED_FILE_SUFFIX = ".COMPLETED";
 
-  /** Header in which to put filename. */
+  /** Header in which to put absolute path filename. */
   public static final String FILENAME_HEADER_KEY = "fileHeaderKey";
   public static final String DEFAULT_FILENAME_HEADER_KEY = "file";
 
-  /** Whether to include filename in a header. */
+  /** Whether to include absolute path filename in a header. */
   public static final String FILENAME_HEADER = "fileHeader";
   public static final boolean DEFAULT_FILE_HEADER = false;
 
+  /** Header in which to put the basename of file. */
+  public static final String BASENAME_HEADER_KEY = "basenameHeaderKey";
+  public static final String DEFAULT_BASENAME_HEADER_KEY = "basename";
+
+  /** Whether to include the basename of a file in a header. */
+  public static final String BASENAME_HEADER = "basenameHeader";
+  public static final boolean DEFAULT_BASENAME_HEADER = false;
+
   /** What size to batch with before sending to ChannelProcessor. */
   public static final String BATCH_SIZE = "batchSize";
   public static final int DEFAULT_BATCH_SIZE = 100;

http://git-wip-us.apache.org/repos/asf/flume/blob/72b3b788/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
index 9a546a5..503ab4d 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java
@@ -93,7 +93,9 @@ public class TestSpoolDirectorySource {
 
     Configurables.configure(source, context);
     source.start();
-    Thread.sleep(500);
+    while (source.getSourceCounter().getEventAcceptedCount() < 8) {
+      Thread.sleep(10);
+    }
     Transaction txn = channel.getTransaction();
     txn.begin();
     Event e = channel.take();
@@ -107,6 +109,40 @@ public class TestSpoolDirectorySource {
   }
 
   @Test
+  public void testPutBasenameHeader() throws IOException,
+    InterruptedException {
+    Context context = new Context();
+    File f1 = new File(tmpDir.getAbsolutePath() + "/file1");
+
+    Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" +
+      "file1line5\nfile1line6\nfile1line7\nfile1line8\n",
+      f1, Charsets.UTF_8);
+
+    context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY,
+        tmpDir.getAbsolutePath());
+    context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER,
+        "true");
+    context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER_KEY,
+        "basenameHeaderKeyTest");
+
+    Configurables.configure(source, context);
+    source.start();
+    while (source.getSourceCounter().getEventAcceptedCount() < 8) {
+      Thread.sleep(10);
+    }
+    Transaction txn = channel.getTransaction();
+    txn.begin();
+    Event e = channel.take();
+    Assert.assertNotNull("Event must not be null", e);
+    Assert.assertNotNull("Event headers must not be null", e.getHeaders());
+    Assert.assertNotNull(e.getHeaders().get("basenameHeaderKeyTest"));
+    Assert.assertEquals(f1.getName(),
+      e.getHeaders().get("basenameHeaderKeyTest"));
+    txn.commit();
+    txn.close();
+  }
+
+  @Test
   public void testLifecycle() throws IOException, InterruptedException {
     Context context = new Context();
     File f1 = new File(tmpDir.getAbsolutePath() + "/file1");

http://git-wip-us.apache.org/repos/asf/flume/blob/72b3b788/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 7a41efb..08c7740 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -946,8 +946,10 @@ Property Name         Default         Description
 **spoolDir**          --              The directory from which to read files 
from.
 fileSuffix            .COMPLETED      Suffix to append to completely ingested 
files
 deletePolicy          never           When to delete completed files: 
``never`` or ``immediate``
-fileHeader            false           Whether to add a header storing the 
filename
-fileHeaderKey         file            Header key to use when appending 
filename to header
+fileHeader            false           Whether to add a header storing the 
absolute path filename.
+fileHeaderKey         file            Header key to use when appending 
absolute path filename to event header.
+basenameHeader        false           Whether to add a header storing the 
basename of the file.
+basenameHeaderKey     basename        Header Key to use when appending  
basename of file to event header.
 ignorePattern         ^$              Regular expression specifying which 
files to ignore (skip)
 trackerDir            .flumespool     Directory to store metadata related to 
processing of files.
                                       If this path is not an absolute path, 
then it is interpreted as relative to the spoolDir.

Reply via email to