Repository: flume
Updated Branches:
  refs/heads/trunk af23980fb -> 4b21230b4


FLUME-2653 Allow hdfs sink inUseSuffix to be empty

This is based on the contributions for FLUME-2653 regarding a new feature
for the hdfs sink.
Added a new parameter hdfs.emptyInUseSuffix to allow the output file name
to remain unchanged. See the user guide changes for details.
This is desired feature from the community.

I added a new junit test case for testing.
Temporarily modified old test cases in my ide to use the new flag, and
they passed. I did this just as one of test, to be on the safe side.
It is not in this PR.

This closes #237

Reviewers: Peter Turcsanyi, Ferenc Szabo

(Endre Major via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4b21230b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4b21230b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4b21230b

Branch: refs/heads/trunk
Commit: 4b21230b4dc5a2971fe40d7808f78cdaad5ec1f0
Parents: af23980
Author: Endre Major <[email protected]>
Authored: Mon Nov 19 12:16:26 2018 +0100
Committer: Ferenc Szabo <[email protected]>
Committed: Mon Nov 19 12:16:26 2018 +0100

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  3 +++
 .../apache/flume/sink/hdfs/HDFSEventSink.java   | 14 ++++++++++++-
 .../flume/sink/hdfs/TestHDFSEventSink.java      | 22 ++++++++++++++++++++
 3 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/4b21230b/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index 6939b59..a983089 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -2372,6 +2372,9 @@ hdfs.filePrefix         FlumeData     Name prefixed to 
files created by Flume in
 hdfs.fileSuffix         --            Suffix to append to file (eg ``.avro`` - 
*NOTE: period is not automatically added*)
 hdfs.inUsePrefix        --            Prefix that is used for temporal files 
that flume actively writes into
 hdfs.inUseSuffix        ``.tmp``      Suffix that is used for temporal files 
that flume actively writes into
+hdfs.emptyInUseSuffix   false         If ``false`` an ``hdfs.inUseSuffix`` is 
used while writing the output. After closing the output
+                                      ``hdfs.inUseSuffix`` is removed from the 
output file name.
+                                      If ``true`` the ``hdfs.inUseSuffix`` 
parameter is ignored an empty string is used instead.
 hdfs.rollInterval       30            Number of seconds to wait before rolling 
current file
                                       (0 = never roll based on time interval)
 hdfs.rollSize           1024          File size to trigger roll, in bytes (0: 
never roll based on file size)

http://git-wip-us.apache.org/repos/asf/flume/blob/4b21230b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index a300996..bb2390e 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -83,6 +83,9 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable, BatchSi
   // Retry forever.
   private static final int defaultTryCount = Integer.MAX_VALUE;
 
+  public static final String IN_USE_SUFFIX_PARAM_NAME = "hdfs.inUseSuffix";
+
+
   /**
    * Default length of time we wait for blocking BucketWriter calls
    * before timing out the operation. Intended to prevent server hangs.
@@ -194,7 +197,16 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable, BatchSi
     fileName = context.getString("hdfs.filePrefix", defaultFileName);
     this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix);
     inUsePrefix = context.getString("hdfs.inUsePrefix", defaultInUsePrefix);
-    inUseSuffix = context.getString("hdfs.inUseSuffix", defaultInUseSuffix);
+    boolean emptyInUseSuffix = context.getBoolean("hdfs.emptyInUseSuffix", 
false);
+    if (emptyInUseSuffix) {
+      inUseSuffix = "";
+      String tmpInUseSuffix = context.getString(IN_USE_SUFFIX_PARAM_NAME);
+      if (tmpInUseSuffix != null) {
+        LOG.warn("Ignoring parameter " + IN_USE_SUFFIX_PARAM_NAME + " for hdfs 
sink: " + getName());
+      }
+    } else {
+      inUseSuffix = context.getString(IN_USE_SUFFIX_PARAM_NAME, 
defaultInUseSuffix);
+    }
     String tzName = context.getString("hdfs.timeZone");
     timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName);
     rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval);

http://git-wip-us.apache.org/repos/asf/flume/blob/4b21230b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
index f86c96d..e836870 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java
@@ -1662,4 +1662,26 @@ public class TestHDFSEventSink {
     Assert.assertEquals(1, sc.getChannelReadFail());
   }
 
+  @Test
+  public void testEmptyInUseSuffix() {
+    String inUseSuffixConf = "aaaa";
+    Context context = new Context();
+    context.put("hdfs.path", testPath);
+    context.put("hdfs.inUseSuffix", inUseSuffixConf);
+
+    //hdfs.emptyInUseSuffix not defined
+    Configurables.configure(sink, context);
+    String inUseSuffix = (String) Whitebox.getInternalState(sink, 
"inUseSuffix");
+    Assert.assertEquals(inUseSuffixConf, inUseSuffix);
+
+    context.put("hdfs.emptyInUseSuffix", "true");
+    Configurables.configure(sink, context);
+    inUseSuffix = (String) Whitebox.getInternalState(sink, "inUseSuffix");
+    Assert.assertEquals("", inUseSuffix);
+
+    context.put("hdfs.emptyInUseSuffix", "false");
+    Configurables.configure(sink, context);
+    inUseSuffix = (String) Whitebox.getInternalState(sink, "inUseSuffix");
+    Assert.assertEquals(inUseSuffixConf, inUseSuffix);
+  }
 }

Reply via email to