hi,
for a project using the SpoolingDirectorySource with a HDFS sink i wanted
to have the same (relative) directory structure in HDFS as in the spool
directory, which uses subdirectories.
to achieve this i updated (all in flume-ng-core)
org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
org/apache/flume/source/SpoolDirectorySource.java
org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
to include the following additional (optional) headers (analog to
basenameHeader):
parentDirectory (the parent directory of the file)
example:
spooldirectory: /var/lib/flume/data/
file: /var/lib/flume/data/some/subdirectory/somefile.log
parentDirectoryHeader = /var/lib/flume/data/some/subdirectory/
relativeParentDirectory (the parent directory of the file relative from the
spooldirectory)
example:
spooldirectory: /var/lib/flume/data/
file: /var/lib/flume/data/some/subdirectory/somefile.log
relativeParentDirectoryHeader = some/subdirectory/
i'm now using the following flume config (excerpt) to get a nice folder
structure in HDFS:
flume.sources.dirSource.spoolDir = /var/lib/flume/data
flume.sources.dirSource.recursiveDirectorySearch = true
flume.sources.dirSource.basenameHeader = true
flume.sources.dirSource.basenameHeaderKey = basename
flume.sources.dirSource.relativeParentDirectoryHeader = true
flume.sources.dirSource.relativeParentDirectoryHeaderKey =
relativeParentDirectory
...
flume.sinks.HDFS.type = hdfs
flume.sinks.HDFS.hdfs.path = hdfs://
bigdata.example.com:54310/application/root/directory/%{relativeParentDirectory}
flume.sinks.HDFS.hdfs.fileType = DataStream
flume.sinks.HDFS.hdfs.filePrefix = %{basename}
example:
a file : /var/lib/flume/data/some/subdirectory/somefile.log
would now be stored in
hdfs://
bigdata.example.com:54310/application/root/directory/some/subdirectory/somefile.log.1476194723885
i attach three patches in case someone finds this useful
(used: http://git-wip-us.apache.org/repos/asf/flume.git => branch : trunk
and instructions from here [1] to create the patch)
krj
[1]
http://stackoverflow.com/questions/9396240/how-do-i-simply-create-a-patch-from-my-latest-git-commit
*Jürgen Jakobitsch*
Innovation Director
Semantic Web Company GmbH
EU: +43-1-4021235-0
Mobile: +43-676-6212710
http://www.semantic-web.at
http://www.poolparty.biz
PERSONAL INFORMATION
| web : http://www.turnguard.com
| foaf : http://www.turnguard.com/turnguard
| g+ : https://plus.google.com/111233759991616358206/posts
| skype : jakobitsch-punkt
| xmlns:tg = "http://www.turnguard.com/turnguard#"
commit ac0c8c20462c369d41363342f3ca0b5bdc701f09
Author: jürgen jakobitsch <[email protected]>
Date: Tue Oct 11 16:08:07 2016 +0200
relativeParentDirectory as header
diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index a0f929c..7e5df36 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -96,8 +96,12 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
private final File metaFile;
private final boolean annotateFileName;
private final boolean annotateBaseName;
+ private final boolean annotateParentDirectory;
+ private final boolean annotateRelativeParentDirectory;
private final String fileNameHeader;
private final String baseNameHeader;
+ private final String parentDirectoryHeader;
+ private final String relativeParentDirectoryHeader;
private final String deletePolicy;
private final Charset inputCharset;
private final DecodeErrorPolicy decodeErrorPolicy;
@@ -121,6 +125,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
String completedSuffix, String ignorePattern, String trackerDirPath,
boolean annotateFileName, String fileNameHeader,
boolean annotateBaseName, String baseNameHeader,
+ boolean annotateParentDirectory, String parentDirectoryHeader,
+ boolean annotateRelativeParentDirectory, String relativeParentDirectoryHeader,
String deserializerType, Context deserializerContext,
String deletePolicy, String inputCharset,
DecodeErrorPolicy decodeErrorPolicy,
@@ -183,6 +189,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
this.fileNameHeader = fileNameHeader;
this.annotateBaseName = annotateBaseName;
this.baseNameHeader = baseNameHeader;
+ this.annotateParentDirectory = annotateParentDirectory;
+ this.parentDirectoryHeader = parentDirectoryHeader;
+ this.annotateRelativeParentDirectory = annotateRelativeParentDirectory;
+ this.relativeParentDirectoryHeader = relativeParentDirectoryHeader;
this.ignorePattern = Pattern.compile(ignorePattern);
this.deletePolicy = deletePolicy;
this.inputCharset = Charset.forName(inputCharset);
@@ -358,6 +368,22 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
event.getHeaders().put(baseNameHeader, basename);
}
}
+
+ if (annotateParentDirectory) {
+ String parentDirectory = currentFile.get().getFile().getParent();
+ for (Event event : events) {
+ event.getHeaders().put(parentDirectoryHeader, parentDirectory);
+ }
+ }
+
+ if (annotateRelativeParentDirectory) {
+ String fileAbsPath = currentFile.get().getFile().getParent();
+ for (Event event : events) {
+ event.getHeaders().put(relativeParentDirectoryHeader,
+ fileAbsPath.replaceFirst(spoolDirectory.getAbsolutePath(), "")
+ );
+ }
+ }
}
@Override
@@ -666,6 +692,17 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
private String fileNameHeader =
SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
+
+ private Boolean annotateParentDirectory =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_PARENT_DIRECTORY_HEADER;
+ private String parentDirectoryHeader =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_PARENT_DIRECTORY_HEADER_KEY;
+
+ private Boolean annotateRelativeParentDirectory =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER;
+ private String relativeParentDirectoryHeader =
+ SpoolDirectorySourceConfigurationConstants.DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER_KEY;
+
private Boolean annotateBaseName =
SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER;
private String baseNameHeader =
@@ -714,6 +751,26 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
this.fileNameHeader = fileNameHeader;
return this;
}
+
+ public Builder annotateParentDirectory(Boolean annotateParentDirectory) {
+ this.annotateParentDirectory = annotateParentDirectory;
+ return this;
+ }
+
+ public Builder parentDirectoryHeader(String parentDirectoryHeader) {
+ this.parentDirectoryHeader = parentDirectoryHeader;
+ return this;
+ }
+
+ public Builder annotateRelativeParentDirectory(Boolean annotateRelativeParentDirectory) {
+ this.annotateRelativeParentDirectory = annotateRelativeParentDirectory;
+ return this;
+ }
+
+ public Builder relativeParentDirectoryHeader(String relativeParentDirectoryHeader) {
+ this.relativeParentDirectoryHeader = relativeParentDirectoryHeader;
+ return this;
+ }
public Builder annotateBaseName(Boolean annotateBaseName) {
this.annotateBaseName = annotateBaseName;
@@ -763,7 +820,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader {
public ReliableSpoolingFileEventReader build() throws IOException {
return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix,
ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
- annotateBaseName, baseNameHeader, deserializerType,
+ annotateBaseName, baseNameHeader, annotateParentDirectory, parentDirectoryHeader,
+ annotateRelativeParentDirectory, relativeParentDirectoryHeader, deserializerType,
deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy,
consumeOrder, recursiveDirectorySearch);
}
commit b4065a4f26a5f65fd498856a89940468383c8965
Author: jürgen jakobitsch <[email protected]>
Date: Tue Oct 11 16:09:41 2016 +0200
parentDirectory as header
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index c8c7cda..2f8c034 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -56,6 +56,10 @@ public class SpoolDirectorySource extends AbstractSource
private String fileHeaderKey;
private boolean basenameHeader;
private String basenameHeaderKey;
+ private boolean parentDirectoryHeader;
+ private String parentDirectoryHeaderKey;
+ private boolean relativeParentDirectoryHeader;
+ private String relativeParentDirectoryHeaderKey;
private int batchSize;
private String ignorePattern;
private String trackerDirPath;
@@ -95,6 +99,10 @@ public class SpoolDirectorySource extends AbstractSource
.fileNameHeader(fileHeaderKey)
.annotateBaseName(basenameHeader)
.baseNameHeader(basenameHeaderKey)
+ .annotateParentDirectory(parentDirectoryHeader)
+ .parentDirectoryHeader(parentDirectoryHeaderKey)
+ .annotateRelativeParentDirectory(relativeParentDirectoryHeader)
+ .relativeParentDirectoryHeader(relativeParentDirectoryHeaderKey)
.deserializerType(deserializerType)
.deserializerContext(deserializerContext)
.deletePolicy(deletePolicy)
@@ -155,6 +163,14 @@ public class SpoolDirectorySource extends AbstractSource
DEFAULT_BASENAME_HEADER);
basenameHeaderKey = context.getString(BASENAME_HEADER_KEY,
DEFAULT_BASENAME_HEADER_KEY);
+ parentDirectoryHeader = context.getBoolean(PARENT_DIRECTORY_HEADER,
+ DEFAULT_PARENT_DIRECTORY_HEADER);
+ parentDirectoryHeaderKey = context.getString(PARENT_DIRECTORY_HEADER_KEY,
+ DEFAULT_PARENT_DIRECTORY_HEADER_KEY);
+ relativeParentDirectoryHeader = context.getBoolean(RELATIVE_PARENT_DIRECTORY_HEADER,
+ DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER);
+ relativeParentDirectoryHeaderKey = context.getString(RELATIVE_PARENT_DIRECTORY_HEADER_KEY,
+ DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER_KEY);
batchSize = context.getInteger(BATCH_SIZE,
DEFAULT_BATCH_SIZE);
inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET);
commit 3969ccb2de71edc1885f463edf5f3032ab15c1d9
Author: jürgen jakobitsch <[email protected]>
Date: Tue Oct 11 16:10:27 2016 +0200
parentDirectory as header
diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 5859aa2..64c89b6 100644
--- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -38,10 +38,28 @@ public class SpoolDirectorySourceConfigurationConstants {
/** Header in which to put the basename of file. */
public static final String BASENAME_HEADER_KEY = "basenameHeaderKey";
public static final String DEFAULT_BASENAME_HEADER_KEY = "basename";
-
+
/** Whether to include the basename of a file in a header. */
public static final String BASENAME_HEADER = "basenameHeader";
public static final boolean DEFAULT_BASENAME_HEADER = false;
+
+ /** Header in which to put the parent directory of file. */
+ public static final String PARENT_DIRECTORY_HEADER_KEY = "parentDirectoryHeaderKey";
+ public static final String DEFAULT_PARENT_DIRECTORY_HEADER_KEY = "parentDirectory";
+
+ public static final String PARENT_DIRECTORY_HEADER = "parentDirectoryHeader";
+ public static final boolean DEFAULT_PARENT_DIRECTORY_HEADER = false;
+
+ /** Header in which to put the parent directory of file. */
+ public static final String RELATIVE_PARENT_DIRECTORY_HEADER_KEY =
+ "relativeParentDirectoryHeaderKey";
+ public static final String DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER_KEY
+ = "relativeParentDirectory";
+
+ public static final String RELATIVE_PARENT_DIRECTORY_HEADER = "relativeParentDirectoryHeader";
+ public static final boolean DEFAULT_RELATIVE_PARENT_DIRECTORY_HEADER = false;
+
+
/** What size to batch with before sending to ChannelProcessor. */
public static final String BATCH_SIZE = "batchSize";