Updated Branches: refs/heads/flume-1.5 bf917dd96 -> 77fd194bf
FLUME-2056. Allow SpoolDir to pass just the filename that is the source of an event (Jeff Lord via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/77fd194b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/77fd194b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/77fd194b Branch: refs/heads/flume-1.5 Commit: 77fd194bf403aeb187a973b8854d09a5beac780d Parents: bf917dd Author: Mike Percy <[email protected]> Authored: Mon Dec 16 14:59:14 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Mon Dec 16 14:59:51 2013 -0800 ---------------------------------------------------------------------- .../avro/ReliableSpoolingFileEventReader.java | 30 ++++++++++++++-- .../flume/source/SpoolDirectorySource.java | 8 +++++ ...olDirectorySourceConfigurationConstants.java | 12 +++++-- .../flume/source/TestSpoolDirectorySource.java | 38 +++++++++++++++++++- flume-ng-doc/sphinx/FlumeUserGuide.rst | 6 ++-- 5 files changed, 87 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java index bd684ed..a88ed6e 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java +++ b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java @@ -92,7 +92,9 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private final Pattern ignorePattern; private final File metaFile; private final boolean annotateFileName; + private final boolean annotateBaseName; private final String fileNameHeader; + private final String baseNameHeader; private final String deletePolicy; private final Charset inputCharset; private final DecodeErrorPolicy decodeErrorPolicy; @@ -108,6 +110,7 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { private ReliableSpoolingFileEventReader(File spoolDirectory, String completedSuffix, String ignorePattern, String trackerDirPath, boolean annotateFileName, String fileNameHeader, + boolean annotateBaseName, String baseNameHeader, String deserializerType, Context deserializerContext, String deletePolicy, String inputCharset, DecodeErrorPolicy decodeErrorPolicy) throws IOException { @@ -164,6 +167,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { this.deserializerContext = deserializerContext; this.annotateFileName = annotateFileName; this.fileNameHeader = fileNameHeader; + this.annotateBaseName = annotateBaseName; + this.baseNameHeader = baseNameHeader; this.ignorePattern = Pattern.compile(ignorePattern); this.deletePolicy = deletePolicy; this.inputCharset = Charset.forName(inputCharset); @@ -253,6 +258,13 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { } } + if (annotateBaseName) { + String basename = currentFile.get().getFile().getName(); + for (Event event : events) { + event.getHeaders().put(baseNameHeader, basename); + } + } + committed = false; lastFileRead = currentFile; return events; @@ -510,6 +522,10 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER; private String fileNameHeader = SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY; + private Boolean annotateBaseName = + SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER; + private String baseNameHeader = + SpoolDirectorySourceConfigurationConstants.DEFAULT_BASENAME_HEADER_KEY; private String deserializerType = SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER; private Context deserializerContext = new Context(); @@ -551,6 +567,16 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { return this; } + public Builder annotateBaseName(Boolean annotateBaseName) { + this.annotateBaseName = annotateBaseName; + return this; + } + + public Builder baseNameHeader(String baseNameHeader) { + this.baseNameHeader = baseNameHeader; + return this; + } + public Builder deserializerType(String deserializerType) { this.deserializerType = deserializerType; return this; @@ -579,8 +605,8 @@ public class ReliableSpoolingFileEventReader implements ReliableEventReader { public ReliableSpoolingFileEventReader build() throws IOException { return new ReliableSpoolingFileEventReader(spoolDirectory, completedSuffix, ignorePattern, trackerDirPath, annotateFileName, fileNameHeader, - deserializerType, deserializerContext, deletePolicy, inputCharset, - decodeErrorPolicy); + annotateBaseName, baseNameHeader, deserializerType, + deserializerContext, deletePolicy, inputCharset, decodeErrorPolicy); } } http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 0160215..f42ed2d 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -54,6 +54,8 @@ Configurable, EventDrivenSource { private String spoolDirectory; private boolean fileHeader; private String fileHeaderKey; + private boolean basenameHeader; + private String basenameHeaderKey; private int batchSize; private String ignorePattern; private String trackerDirPath; @@ -87,6 +89,8 @@ Configurable, EventDrivenSource { .trackerDirPath(trackerDirPath) .annotateFileName(fileHeader) .fileNameHeader(fileHeaderKey) + .annotateBaseName(basenameHeader) + .baseNameHeader(basenameHeaderKey) .deserializerType(deserializerType) .deserializerContext(deserializerContext) .deletePolicy(deletePolicy) @@ -142,6 +146,10 @@ Configurable, EventDrivenSource { DEFAULT_FILE_HEADER); fileHeaderKey = context.getString(FILENAME_HEADER_KEY, DEFAULT_FILENAME_HEADER_KEY); + basenameHeader = context.getBoolean(BASENAME_HEADER, + DEFAULT_BASENAME_HEADER); + basenameHeaderKey = context.getString(BASENAME_HEADER_KEY, + DEFAULT_BASENAME_HEADER_KEY); batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE); inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET); http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index a2befe8..83522c0 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -27,14 +27,22 @@ public class SpoolDirectorySourceConfigurationConstants { public static final String SPOOLED_FILE_SUFFIX = "fileSuffix"; public static final String DEFAULT_SPOOLED_FILE_SUFFIX = ".COMPLETED"; - /** Header in which to put filename. */ + /** Header in which to put absolute path filename. */ public static final String FILENAME_HEADER_KEY = "fileHeaderKey"; public static final String DEFAULT_FILENAME_HEADER_KEY = "file"; - /** Whether to include filename in a header. */ + /** Whether to include absolute path filename in a header. */ public static final String FILENAME_HEADER = "fileHeader"; public static final boolean DEFAULT_FILE_HEADER = false; + /** Header in which to put the basename of file. */ + public static final String BASENAME_HEADER_KEY = "basenameHeaderKey"; + public static final String DEFAULT_BASENAME_HEADER_KEY = "basename"; + + /** Whether to include the basename of a file in a header. */ + public static final String BASENAME_HEADER = "basenameHeader"; + public static final boolean DEFAULT_BASENAME_HEADER = false; + /** What size to batch with before sending to ChannelProcessor. */ public static final String BATCH_SIZE = "batchSize"; public static final int DEFAULT_BATCH_SIZE = 100; http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 9a546a5..503ab4d 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -93,7 +93,9 @@ public class TestSpoolDirectorySource { Configurables.configure(source, context); source.start(); - Thread.sleep(500); + while (source.getSourceCounter().getEventAcceptedCount() < 8) { + Thread.sleep(10); + } Transaction txn = channel.getTransaction(); txn.begin(); Event e = channel.take(); @@ -107,6 +109,40 @@ public class TestSpoolDirectorySource { } @Test + public void testPutBasenameHeader() throws IOException, + InterruptedException { + Context context = new Context(); + File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); + + Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); + + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER, + "true"); + context.put(SpoolDirectorySourceConfigurationConstants.BASENAME_HEADER_KEY, + "basenameHeaderKeyTest"); + + Configurables.configure(source, context); + source.start(); + while (source.getSourceCounter().getEventAcceptedCount() < 8) { + Thread.sleep(10); + } + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + Assert.assertNotNull("Event must not be null", e); + Assert.assertNotNull("Event headers must not be null", e.getHeaders()); + Assert.assertNotNull(e.getHeaders().get("basenameHeaderKeyTest")); + Assert.assertEquals(f1.getName(), + e.getHeaders().get("basenameHeaderKeyTest")); + txn.commit(); + txn.close(); + } + + @Test public void testLifecycle() throws IOException, InterruptedException { Context context = new Context(); File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); http://git-wip-us.apache.org/repos/asf/flume/blob/77fd194b/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 7a41efb..08c7740 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -946,8 +946,10 @@ Property Name Default Description **spoolDir** -- The directory from which to read files from. fileSuffix .COMPLETED Suffix to append to completely ingested files deletePolicy never When to delete completed files: ``never`` or ``immediate`` -fileHeader false Whether to add a header storing the filename -fileHeaderKey file Header key to use when appending filename to header +fileHeader false Whether to add a header storing the absolute path filename. +fileHeaderKey file Header key to use when appending absolute path filename to event header. +basenameHeader false Whether to add a header storing the basename of the file. +basenameHeaderKey basename Header Key to use when appending basename of file to event header. ignorePattern ^$ Regular expression specifying which files to ignore (skip) trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
