Repository: flume Updated Branches: refs/heads/trunk 4b44dfc64 -> 2399329ee
FLUME-3002. Fix tests in TestBucketWriter Some tests are flaky in TestBucketWriter. This commit fixes the flakiness mentioned before by adding a new constructor with an extra Clock parameter. Reviewers: Attila Simon, Denes Arvay Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2399329e Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2399329e Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2399329e Branch: refs/heads/trunk Commit: 2399329ee2ca2d9fc4ec0ec8fc5d16fb213795b2 Parents: 4b44dfc Author: Bessenyei Balázs Donát <[email protected]> Authored: Mon Oct 24 14:27:08 2016 +0200 Committer: Bessenyei Balázs Donát <[email protected]> Committed: Mon Oct 24 14:27:08 2016 +0200 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/BucketWriter.java | 25 ++++++++--- .../flume/sink/hdfs/TestBucketWriter.java | 47 ++++++++++---------- 2 files changed, 44 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2399329e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index b096410..300496a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -106,7 +106,6 @@ class BucketWriter { private boolean mockFsInjected = false; - private Clock clock = new SystemClock(); private final long retryInterval; private final int maxRenameTries; @@ -124,6 +123,26 @@ class BucketWriter { String onCloseCallbackPath, long callTimeout, ExecutorService callTimeoutPool, long retryInterval, int maxCloseTries) { + this(rollInterval, rollSize, rollCount, batchSize, + context, filePath, fileName, inUsePrefix, + inUseSuffix, fileSuffix, codeC, + compType, writer, + timedRollerPool, proxyUser, + sinkCounter, idleTimeout, onCloseCallback, + onCloseCallbackPath, callTimeout, + callTimeoutPool, retryInterval, + maxCloseTries, new SystemClock()); + } + + BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, + Context context, String filePath, String fileName, String inUsePrefix, + String inUseSuffix, String fileSuffix, CompressionCodec codeC, + CompressionType compType, HDFSWriter writer, + ScheduledExecutorService timedRollerPool, PrivilegedExecutor proxyUser, + SinkCounter sinkCounter, int idleTimeout, WriterCallback onCloseCallback, + String onCloseCallbackPath, long callTimeout, + ExecutorService callTimeoutPool, long retryInterval, + int maxCloseTries, Clock clock) { this.rollInterval = rollInterval; this.rollSize = rollSize; this.rollCount = rollCount; @@ -634,10 +653,6 @@ class BucketWriter { return (batchCounter == 0); } - void setClock(Clock clock) { - this.clock = clock; - } - /** * This method if the current thread has been interrupted and throws an * exception. http://git-wip-us.apache.org/repos/asf/flume/blob/2399329e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 742deb0..78241a1 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -249,13 +249,6 @@ public class TestBucketWriter { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String suffix = null; - MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter( - ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, - SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, - Executors.newSingleThreadExecutor(), 0, 0); - // Need to override system time use for test so we know what to expect final long testTime = System.currentTimeMillis(); Clock testClock = new Clock() { @@ -263,7 +256,13 @@ public class TestBucketWriter { return testTime; } }; - bucketWriter.setClock(testClock); + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0, testClock); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); @@ -277,13 +276,6 @@ public class TestBucketWriter { final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test final String suffix = ".avro"; - MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter( - ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, - SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, - new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, - Executors.newSingleThreadExecutor(), 0, 0); - // Need to override system time use for test so we know what to expect final long testTime = System.currentTimeMillis(); @@ -293,7 +285,14 @@ public class TestBucketWriter { return testTime; } }; - bucketWriter.setClock(testClock); + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null, + SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, null, null, 30000, + Executors.newSingleThreadExecutor(), 0, 0, testClock); + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); @@ -309,12 +308,6 @@ public class TestBucketWriter { final String suffix = ".foo"; MockHDFSWriter hdfsWriter = new MockHDFSWriter(); - BucketWriter bucketWriter = new BucketWriter( - ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, - HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter, - timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0 - ); // Need to override system time use for test so we know what to expect final long testTime = System.currentTimeMillis(); @@ -324,7 +317,15 @@ public class TestBucketWriter { return testTime; } }; - bucketWriter.setClock(testClock); + + BucketWriter bucketWriter = new BucketWriter( + ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, + HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, hdfsWriter, + timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), + 0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0, testClock + ); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e);
