Repository: flume
Updated Branches:
  refs/heads/trunk 4b44dfc64 -> 2399329ee


FLUME-3002. Fix tests in TestBucketWriter

Some tests are flaky in TestBucketWriter.

This commit fixes the flakiness mentioned before by
adding a new constructor with an extra Clock parameter.

Reviewers: Attila Simon, Denes Arvay


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2399329e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2399329e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2399329e

Branch: refs/heads/trunk
Commit: 2399329ee2ca2d9fc4ec0ec8fc5d16fb213795b2
Parents: 4b44dfc
Author: Bessenyei Balázs Donát <[email protected]>
Authored: Mon Oct 24 14:27:08 2016 +0200
Committer: Bessenyei Balázs Donát <[email protected]>
Committed: Mon Oct 24 14:27:08 2016 +0200

----------------------------------------------------------------------
 .../apache/flume/sink/hdfs/BucketWriter.java    | 25 ++++++++---
 .../flume/sink/hdfs/TestBucketWriter.java       | 47 ++++++++++----------
 2 files changed, 44 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/2399329e/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
index b096410..300496a 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java
@@ -106,7 +106,6 @@ class BucketWriter {
 
   private boolean mockFsInjected = false;
 
-  private Clock clock = new SystemClock();
   private final long retryInterval;
   private final int maxRenameTries;
 
@@ -124,6 +123,26 @@ class BucketWriter {
       String onCloseCallbackPath, long callTimeout,
       ExecutorService callTimeoutPool, long retryInterval,
       int maxCloseTries) {
+    this(rollInterval, rollSize, rollCount, batchSize,
+            context, filePath, fileName, inUsePrefix,
+            inUseSuffix, fileSuffix, codeC,
+            compType, writer,
+            timedRollerPool, proxyUser,
+            sinkCounter, idleTimeout, onCloseCallback,
+            onCloseCallbackPath, callTimeout,
+            callTimeoutPool, retryInterval,
+            maxCloseTries, new SystemClock());
+  }
+
+  BucketWriter(long rollInterval, long rollSize, long rollCount, long 
batchSize,
+           Context context, String filePath, String fileName, String 
inUsePrefix,
+           String inUseSuffix, String fileSuffix, CompressionCodec codeC,
+           CompressionType compType, HDFSWriter writer,
+           ScheduledExecutorService timedRollerPool, PrivilegedExecutor 
proxyUser,
+           SinkCounter sinkCounter, int idleTimeout, WriterCallback 
onCloseCallback,
+           String onCloseCallbackPath, long callTimeout,
+           ExecutorService callTimeoutPool, long retryInterval,
+           int maxCloseTries, Clock clock) {
     this.rollInterval = rollInterval;
     this.rollSize = rollSize;
     this.rollCount = rollCount;
@@ -634,10 +653,6 @@ class BucketWriter {
     return (batchCounter == 0);
   }
 
-  void setClock(Clock clock) {
-    this.clock = clock;
-  }
-
   /**
    * This method if the current thread has been interrupted and throws an
    * exception.

http://git-wip-us.apache.org/repos/asf/flume/blob/2399329e/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
index 742deb0..78241a1 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java
@@ -249,13 +249,6 @@ public class TestBucketWriter {
     final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in 
course of test
     final String suffix = null;
 
-    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 
0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
-
     // Need to override system time use for test so we know what to expect
     final long testTime = System.currentTimeMillis();
     Clock testClock = new Clock() {
@@ -263,7 +256,13 @@ public class TestBucketWriter {
         return testTime;
       }
     };
-    bucketWriter.setClock(testClock);
+
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 
0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0, testClock);
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -277,13 +276,6 @@ public class TestBucketWriter {
     final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in 
course of test
     final String suffix = ".avro";
 
-    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
-        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
-        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 
0, null, null, 30000,
-        Executors.newSingleThreadExecutor(), 0, 0);
-
     // Need to override system time use for test so we know what to expect
 
     final long testTime = System.currentTimeMillis();
@@ -293,7 +285,14 @@ public class TestBucketWriter {
         return testTime;
       }
     };
-    bucketWriter.setClock(testClock);
+
+    MockHDFSWriter hdfsWriter = new MockHDFSWriter();
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix, null,
+        SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, proxy,
+        new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 
0, null, null, 30000,
+        Executors.newSingleThreadExecutor(), 0, 0, testClock);
+
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);
@@ -309,12 +308,6 @@ public class TestBucketWriter {
     final String suffix = ".foo";
 
     MockHDFSWriter hdfsWriter = new MockHDFSWriter();
-    BucketWriter bucketWriter = new BucketWriter(
-        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix,
-        HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, 
hdfsWriter,
-        timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + 
System.currentTimeMillis()),
-        0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0
-    );
 
     // Need to override system time use for test so we know what to expect
     final long testTime = System.currentTimeMillis();
@@ -324,7 +317,15 @@ public class TestBucketWriter {
         return testTime;
       }
     };
-    bucketWriter.setClock(testClock);
+
+    BucketWriter bucketWriter = new BucketWriter(
+        ROLL_INTERVAL, 0, 0, 0, ctx, "/tmp", "file", "", ".tmp", suffix,
+        HDFSEventSink.getCodec("gzip"), SequenceFile.CompressionType.BLOCK, 
hdfsWriter,
+        timedRollerPool, proxy, new SinkCounter("test-bucket-writer-" + 
System.currentTimeMillis()),
+        0, null, null, 30000, Executors.newSingleThreadExecutor(), 0, 0, 
testClock
+    );
+
+
 
     Event e = EventBuilder.withBody("foo", Charsets.UTF_8);
     bucketWriter.append(e);

Reply via email to