Repository: flume Updated Branches: refs/heads/trunk af23980fb -> 4b21230b4
FLUME-2653 Allow hdfs sink inUseSuffix to be empty This is based on the contributions for FLUME-2653 regarding a new feature for the hdfs sink. Added a new parameter hdfs.emptyInUseSuffix to allow the output file name to remain unchanged. See the user guide changes for details. This is desired feature from the community. I added a new junit test case for testing. Temporarily modified old test cases in my ide to use the new flag, and they passed. I did this just as one of test, to be on the safe side. It is not in this PR. This closes #237 Reviewers: Peter Turcsanyi, Ferenc Szabo (Endre Major via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4b21230b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4b21230b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4b21230b Branch: refs/heads/trunk Commit: 4b21230b4dc5a2971fe40d7808f78cdaad5ec1f0 Parents: af23980 Author: Endre Major <[email protected]> Authored: Mon Nov 19 12:16:26 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Mon Nov 19 12:16:26 2018 +0100 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 3 +++ .../apache/flume/sink/hdfs/HDFSEventSink.java | 14 ++++++++++++- .../flume/sink/hdfs/TestHDFSEventSink.java | 22 ++++++++++++++++++++ 3 files changed, 38 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4b21230b/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 6939b59..a983089 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2372,6 +2372,9 @@ hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs.fileSuffix -- Suffix to append to file (eg ``.avro`` - *NOTE: period is not automatically added*) hdfs.inUsePrefix -- Prefix that is used for temporal files that flume actively writes into hdfs.inUseSuffix ``.tmp`` Suffix that is used for temporal files that flume actively writes into +hdfs.emptyInUseSuffix false If ``false`` an ``hdfs.inUseSuffix`` is used while writing the output. After closing the output + ``hdfs.inUseSuffix`` is removed from the output file name. + If ``true`` the ``hdfs.inUseSuffix`` parameter is ignored an empty string is used instead. hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval) hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size) http://git-wip-us.apache.org/repos/asf/flume/blob/4b21230b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index a300996..bb2390e 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -83,6 +83,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable, BatchSi // Retry forever. private static final int defaultTryCount = Integer.MAX_VALUE; + public static final String IN_USE_SUFFIX_PARAM_NAME = "hdfs.inUseSuffix"; + + /** * Default length of time we wait for blocking BucketWriter calls * before timing out the operation. Intended to prevent server hangs. @@ -194,7 +197,16 @@ public class HDFSEventSink extends AbstractSink implements Configurable, BatchSi fileName = context.getString("hdfs.filePrefix", defaultFileName); this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix); inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix); - inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix); + boolean emptyInUseSuffix = context.getBoolean("hdfs.emptyInUseSuffix", false); + if (emptyInUseSuffix) { + inUseSuffix = ""; + String tmpInUseSuffix = context.getString(IN_USE_SUFFIX_PARAM_NAME); + if (tmpInUseSuffix != null) { + LOG.warn("Ignoring parameter " + IN_USE_SUFFIX_PARAM_NAME + " for hdfs sink: " + getName()); + } + } else { + inUseSuffix = context.getString(IN_USE_SUFFIX_PARAM_NAME, defaultInUseSuffix); + } String tzName = context.getString("hdfs.timeZone"); timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName); rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval); http://git-wip-us.apache.org/repos/asf/flume/blob/4b21230b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index f86c96d..e836870 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -1662,4 +1662,26 @@ public class TestHDFSEventSink { Assert.assertEquals(1, sc.getChannelReadFail()); } + @Test + public void testEmptyInUseSuffix() { + String inUseSuffixConf = "aaaa"; + Context context = new Context(); + context.put("hdfs.path", testPath); + context.put("hdfs.inUseSuffix", inUseSuffixConf); + + //hdfs.emptyInUseSuffix not defined + Configurables.configure(sink, context); + String inUseSuffix = (String) Whitebox.getInternalState(sink, "inUseSuffix"); + Assert.assertEquals(inUseSuffixConf, inUseSuffix); + + context.put("hdfs.emptyInUseSuffix", "true"); + Configurables.configure(sink, context); + inUseSuffix = (String) Whitebox.getInternalState(sink, "inUseSuffix"); + Assert.assertEquals("", inUseSuffix); + + context.put("hdfs.emptyInUseSuffix", "false"); + Configurables.configure(sink, context); + inUseSuffix = (String) Whitebox.getInternalState(sink, "inUseSuffix"); + Assert.assertEquals(inUseSuffixConf, inUseSuffix); + } }
