hi,

for a project using the SpoolingDirectorySource with a HDFS sink i wanted
to have the same (relative) directory structure in HDFS as in the spool
directory, which uses subdirectories.

to achieve this i updated (all in flume-ng-core)

org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
org/apache/flume/source/SpoolDirectorySource.java
org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java

to include the following additional (optional) headers (analog to
basenameHeader):

parentDirectory (the parent directory of the file)

example:
spooldirectory: /var/lib/flume/data/
file: /var/lib/flume/data/some/subdirectory/somefile.log

parentDirectoryHeader = /var/lib/flume/data/some/subdirectory/

relativeParentDirectory (the parent directory of the file relative from the
spooldirectory)

example:
spooldirectory: /var/lib/flume/data/
file: /var/lib/flume/data/some/subdirectory/somefile.log

relativeParentDirectoryHeader = some/subdirectory/


i'm now using the following flume config (excerpt) to get a nice folder
structure in HDFS:

flume.sources.dirSource.spoolDir = /var/lib/flume/data
flume.sources.dirSource.recursiveDirectorySearch = true
flume.sources.dirSource.basenameHeader = true
flume.sources.dirSource.basenameHeaderKey = basename
flume.sources.dirSource.relativeParentDirectoryHeader = true
flume.sources.dirSource.relativeParentDirectoryHeaderKey =
relativeParentDirectory
...
flume.sinks.HDFS.type = hdfs
flume.sinks.HDFS.hdfs.path = hdfs://
bigdata.example.com:54310/application/root/directory/%{relativeParentDirectory}
flume.sinks.HDFS.hdfs.fileType = DataStream
flume.sinks.HDFS.hdfs.filePrefix = %{basename}

example:

a file : /var/lib/flume/data/some/subdirectory/somefile.log
would now be stored in

hdfs://
bigdata.example.com:54310/application/root/directory/some/subdirectory/somefile.log.1476194723885

i attach three patches in case someone finds this useful
(used: http://git-wip-us.apache.org/repos/asf/flume.git => branch : trunk
and instructions from here [1] to create the patch)

krj

[1]
http://stackoverflow.com/questions/9396240/how-do-i-simply-create-a-patch-from-my-latest-git-commit

*Jürgen Jakobitsch*
Innovation Director
Semantic Web Company GmbH
EU: +43-1-4021235-0
Mobile: +43-676-6212710
http://www.semantic-web.at
http://www.poolparty.biz


PERSONAL INFORMATION
| web       : http://www.turnguard.com
| foaf      : http://www.turnguard.com/turnguard
| g+        : https://plus.google.com/111233759991616358206/posts
| skype     : jakobitsch-punkt
| xmlns:tg  = "http://www.turnguard.com/turnguard#";
commit ac0c8c20462c369d41363342f3ca0b5bdc701f09
Author: jürgen jakobitsch <[email protected]>
Date:   Tue Oct 11 16:08:07 2016 +0200

    relativeParentDirectory as header

diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index a0f929c..7e5df36 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -96,8 +96,12 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
   private final File metaFile;
   private final boolean annotateFileName;
   private final boolean annotateBaseName;
+  private final boolean annotateParentDirectory;
+  private final boolean annotateRelativeParentDirectory;
   private final String fileNameHeader;
   private final String baseNameHeader;
+  private final String parentDirectoryHeader;
+  private final String relativeParentDirectoryHeader;  
   private final String deletePolicy;
   private final Charset inputCharset;
   private final DecodeErrorPolicy decodeErrorPolicy;
@@ -121,6 +125,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       String completedSuffix, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
       boolean annotateBaseName, String baseNameHeader,
+      boolean annotateParentDirectory, String parentDirectoryHeader,
+      boolean annotateRelativeParentDirectory, String relativeParentDirectoryHeader,
       String deserializerType, Context deserializerContext,
       String deletePolicy, String inputCharset,
       DecodeErrorPolicy decodeErrorPolicy,
@@ -183,6 +189,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     this.fileNameHeader = fileNameHeader;
     this.annotateBaseName = annotateBaseName;
     this.baseNameHeader = baseNameHeader;
+    this.annotateParentDirectory = annotateParentDirectory;
+    this.parentDirectoryHeader = parentDirectoryHeader;
+    this.annotateRelativeParentDirectory = annotateRelativeParentDirectory;
+    this.relativeParentDirectoryHeader = relativeParentDirectoryHeader;    
     this.ignorePattern = Pattern.compile(ignorePattern);
     this.deletePolicy = deletePolicy;
     this.inputCharset = Charset.forName(inputCharset);
@@ -358,6 +368,22 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
         event.getHeaders().put(baseNameHeader, basename);
       }
     }
+    
+    if (annotateParentDirectory) {
+      String parentDirectory = currentFile.get().getFile().getParent();
+      for (Event event : events) {
+        event.getHeaders().put(parentDirectoryHeader, parentDirectory);
+      }        
+    }
+    
+    if (annotateRelativeParentDirectory) {
+      String fileAbsPath = currentFile.get().getFile().getParent();
+      for (Event event : events) {
+        event.getHeaders().put(relativeParentDirectoryHeader,
+            fileAbsPath.replaceFirst(spoolDirectory.getAbsolutePath(), "")
+        );
+      }        
+    }    
   }
 
   @Override
@@ -666,6 +692,17 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
         SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
     private String fileNameHeader =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
+    
+    private Boolean annotateParentDirectory =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_PARENT_DIRECTORY_HEADER;
+    private String parentDirectoryHeader =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_PARENT_DIRECTORY_HEADER_KEY;
+    
+    private Boolean annotateRelativeParentDirectory =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER;
+    private String relativeParentDirectoryHeader =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER_KEY;
+        
     private Boolean annotateBaseName =
         SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER;
     private String baseNameHeader =
@@ -714,6 +751,26 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
       this.fileNameHeader = fileNameHeader;
       return this;
     }
+    
+    public Builder annotateParentDirectory(Boolean annotateParentDirectory) {
+      this.annotateParentDirectory = annotateParentDirectory;
+      return this;
+    }
+
+    public Builder parentDirectoryHeader(String parentDirectoryHeader) {
+      this.parentDirectoryHeader = parentDirectoryHeader;
+      return this;
+    }
+    
+    public Builder annotateRelativeParentDirectory(Boolean annotateRelativeParentDirectory) {
+      this.annotateRelativeParentDirectory = annotateRelativeParentDirectory;
+      return this;
+    }
+
+    public Builder relativeParentDirectoryHeader(String relativeParentDirectoryHeader) {
+      this.relativeParentDirectoryHeader = relativeParentDirectoryHeader;
+      return this;
+    }    
 
     public Builder annotateBaseName(Boolean annotateBaseName) {
       this.annotateBaseName = annotateBaseName;
@@ -763,7 +820,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
     public ReliableSpoolingFileEventReader build() throws IOException {
       return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
           ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
-          annotateBaseName, baseNameHeader, deserializerType,
+          annotateBaseName, baseNameHeader, annotateParentDirectory, parentDirectoryHeader, 
+              annotateRelativeParentDirectory, relativeParentDirectoryHeader, deserializerType,
           deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy,
           consumeOrder, recursiveDirectorySearch);
     }
commit b4065a4f26a5f65fd498856a89940468383c8965
Author: jürgen jakobitsch <[email protected]>
Date:   Tue Oct 11 16:09:41 2016 +0200

    parentDirectory as header

diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index c8c7cda..2f8c034 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -56,6 +56,10 @@ public class SpoolDirectorySource extends AbstractSource
   private String fileHeaderKey;
   private boolean basenameHeader;
   private String basenameHeaderKey;
+  private boolean parentDirectoryHeader;
+  private String parentDirectoryHeaderKey;  
+  private boolean relativeParentDirectoryHeader;
+  private String relativeParentDirectoryHeaderKey;    
   private int batchSize;
   private String ignorePattern;
   private String trackerDirPath;
@@ -95,6 +99,10 @@ public class SpoolDirectorySource extends AbstractSource
           .fileNameHeader(fileHeaderKey)
           .annotateBaseName(basenameHeader)
           .baseNameHeader(basenameHeaderKey)
+          .annotateParentDirectory(parentDirectoryHeader)
+          .parentDirectoryHeader(parentDirectoryHeaderKey)
+          .annotateRelativeParentDirectory(relativeParentDirectoryHeader)
+          .relativeParentDirectoryHeader(relativeParentDirectoryHeaderKey)
           .deserializerType(deserializerType)
           .deserializerContext(deserializerContext)
           .deletePolicy(deletePolicy)
@@ -155,6 +163,14 @@ public class SpoolDirectorySource extends AbstractSource
         DEFAULT_BASENAME_HEADER);
     basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
         DEFAULT_BASENAME_HEADER_KEY);
+    parentDirectoryHeader = context.getBoolean(PARENT_DIRECTORY_HEADER, 
+         DEFAULT_PARENT_DIRECTORY_HEADER);
+    parentDirectoryHeaderKey = context.getString(PARENT_DIRECTORY_HEADER_KEY, 
+         DEFAULT_PARENT_DIRECTORY_HEADER_KEY);    
+    relativeParentDirectoryHeader = context.getBoolean(RELATIVE_PARENT_DIRECTORY_HEADER, 
+         DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER);
+    relativeParentDirectoryHeaderKey = context.getString(RELATIVE_PARENT_DIRECTORY_HEADER_KEY, 
+         DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER_KEY);        
     batchSize = context.getInteger(BATCH_SIZE,
         DEFAULT_BATCH_SIZE);
     inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
commit 3969ccb2de71edc1885f463edf5f3032ab15c1d9
Author: jürgen jakobitsch <[email protected]>
Date:   Tue Oct 11 16:10:27 2016 +0200

    parentDirectory as header

diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 5859aa2..64c89b6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -38,10 +38,28 @@ public class SpoolDirectorySourceConfigurationConstants {
   /** Header in which to put the basename of file. */
   public static final String BASENAME_HEADER_KEY = "basenameHeaderKey";
   public static final String DEFAULT_BASENAME_HEADER_KEY = "basename";
-
+  
   /** Whether to include the basename of a file in a header. */
   public static final String BASENAME_HEADER = "basenameHeader";
   public static final boolean DEFAULT_BASENAME_HEADER = false;
+  
+  /** Header in which to put the parent directory of file. */
+  public static final String PARENT_DIRECTORY_HEADER_KEY = "parentDirectoryHeaderKey";
+  public static final String DEFAULT_PARENT_DIRECTORY_HEADER_KEY = "parentDirectory";
+  
+  public static final String PARENT_DIRECTORY_HEADER = "parentDirectoryHeader";
+  public static final boolean DEFAULT_PARENT_DIRECTORY_HEADER = false;
+  
+  /** Header in which to put the parent directory of file. */
+  public static final String RELATIVE_PARENT_DIRECTORY_HEADER_KEY = 
+          "relativeParentDirectoryHeaderKey";
+  public static final String DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER_KEY 
+          = "relativeParentDirectory";
+  
+  public static final String RELATIVE_PARENT_DIRECTORY_HEADER = "relativeParentDirectoryHeader";
+  public static final boolean DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER = false;
+    
+
 
   /** What size to batch with before sending to ChannelProcessor. */
   public static final String BATCH_SIZE = "batchSize";

Reply via email to