Updated Branches: refs/heads/trunk 4c5220bb7 -> 2f6fea509
FLUME-1955. fileSuffix does not work with compressed streams. (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2f6fea50 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2f6fea50 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2f6fea50 Branch: refs/heads/trunk Commit: 2f6fea50965e5a8ae059d179b0b25be738696cef Parents: 4c5220b Author: Hari Shreedharan <[email protected]> Authored: Tue Mar 19 16:32:08 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Mar 19 16:32:08 2013 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 6 +-- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 3 +- .../apache/flume/sink/hdfs/TestBucketWriter.java | 31 +++++++++++++++ 3 files changed, 35 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 774f297..0897c97 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -222,11 +222,9 @@ class BucketWriter { String fullFileName = fileName + "." + counter; - if (codeC == null && fileSuffix != null && fileSuffix.length() > 0) { + if (fileSuffix != null && fileSuffix.length() > 0) { fullFileName += fileSuffix; - } - - if(codeC != null) { + } else if (codeC != null) { fullFileName += codeC.getDefaultExtension(); } http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 741ac90..f0a6e4b 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -294,7 +294,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable { return false; } - private static CompressionCodec getCodec(String codecName) { + @VisibleForTesting + static CompressionCodec getCodec(String codecName) { Configuration conf = new Configuration(); List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory .getCodecClasses(conf); http://git-wip-us.apache.org/repos/asf/flume/blob/2f6fea50/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 99e787e..f741e03 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -280,6 +280,37 @@ public class TestBucketWriter { } @Test + public void testFileSuffixCompressed() + throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String suffix = ".foo"; + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + "/tmp", "file", "", ".tmp", suffix, HDFSEventSink.getCodec("gzip"), + SequenceFile.CompressionType.BLOCK, hdfsWriter, + timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null, null, 30000, Executors.newSingleThreadExecutor()); + + // Need to override system time use for test so we know what to expect + final long testTime = System.currentTimeMillis(); + + Clock testClock = new Clock() { + public long currentTimeMillis() { + return testTime; + } + }; + bucketWriter.setClock(testClock); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); + + Assert.assertTrue("Incorrect suffix",hdfsWriter.getOpenedFilePath() + .endsWith(Long.toString(testTime+1) + suffix + ".tmp")); + } + + @Test public void testInUsePrefix() throws IOException, InterruptedException { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String PREFIX = "BRNO_IS_CITY_IN_CZECH_REPUBLIC";
