http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
index 7851536..8921a19 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannel.java
@@ -19,10 +19,6 @@
 
 package org.apache.flume.channel;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -34,8 +30,12 @@ import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import static org.fest.reflect.core.Reflection.*;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingDeque;
+
+import static org.fest.reflect.core.Reflection.field;
 
 public class TestMemoryChannel {
 
@@ -81,7 +81,7 @@ public class TestMemoryChannel {
 
     Transaction transaction = channel.getTransaction();
     transaction.begin();
-    for(int i=0; i < 5; i++) {
+    for (int i = 0; i < 5; i++) {
       channel.put(EventBuilder.withBody(String.format("test event %d", 
i).getBytes()));
     }
     transaction.commit();
@@ -124,7 +124,7 @@ public class TestMemoryChannel {
     parms.put("transactionCapacity", "2");
     context.putAll(parms);
     Configurables.configure(channel, context);
-    for(int i=0; i < 6; i++) {
+    for (int i = 0; i < 6; i++) {
       transaction = channel.getTransaction();
       transaction.begin();
       Assert.assertNotNull(channel.take());
@@ -133,7 +133,7 @@ public class TestMemoryChannel {
     }
   }
 
-  @Test(expected=ChannelException.class)
+  @Test(expected = ChannelException.class)
   public void testTransactionPutCapacityOverload() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();
@@ -151,7 +151,7 @@ public class TestMemoryChannel {
     Assert.fail();
   }
 
-  @Test(expected=ChannelException.class)
+  @Test(expected = ChannelException.class)
   public void testCapacityOverload() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();
@@ -236,7 +236,7 @@ public class TestMemoryChannel {
     tx.close();
   }
 
-  @Test(expected=ChannelException.class)
+  @Test(expected = ChannelException.class)
   public void testByteCapacityOverload() {
     Context context = new Context();
     Map<String, String> parms = new HashMap<String, String>();
@@ -284,8 +284,7 @@ public class TestMemoryChannel {
     try {
       channel.put(EventBuilder.withBody(eventBody));
       throw new RuntimeException("Put was able to overflow byte capacity.");
-    } catch (ChannelException ce)
-    {
+    } catch (ChannelException ce) {
       //Do nothing
     }
 
@@ -306,8 +305,7 @@ public class TestMemoryChannel {
     try {
       channel.put(EventBuilder.withBody(eventBody));
       throw new RuntimeException("Put was able to overflow byte capacity.");
-    } catch (ChannelException ce)
-    {
+    } catch (ChannelException ce) {
       //Do nothing
     }
     tx.commit();
@@ -370,7 +368,7 @@ public class TestMemoryChannel {
       channel.put(EventBuilder.withBody(eventBody));
       tx.commit();
       Assert.fail();
-    } catch ( ChannelException e ) {
+    } catch (ChannelException e) {
       //success
       tx.rollback();
     } finally {
@@ -397,12 +395,12 @@ public class TestMemoryChannel {
     tx = channel.getTransaction();
     tx.begin();
     try {
-      for(int i = 0; i < 2; i++) {
+      for (int i = 0; i < 2; i++) {
         channel.put(EventBuilder.withBody(eventBody));
       }
       tx.commit();
       Assert.fail();
-    } catch ( ChannelException e ) {
+    } catch (ChannelException e) {
       //success
       tx.rollback();
     } finally {
@@ -418,12 +416,12 @@ public class TestMemoryChannel {
     tx.begin();
 
     try {
-      for(int i = 0; i < 15; i++) {
+      for (int i = 0; i < 15; i++) {
         channel.put(EventBuilder.withBody(eventBody));
       }
       tx.commit();
       Assert.fail();
-    } catch ( ChannelException e ) {
+    } catch (ChannelException e) {
       //success
       tx.rollback();
     } finally {
@@ -438,12 +436,12 @@ public class TestMemoryChannel {
     tx.begin();
 
     try {
-      for(int i = 0; i < 25; i++) {
+      for (int i = 0; i < 25; i++) {
         channel.put(EventBuilder.withBody(eventBody));
       }
       tx.commit();
       Assert.fail();
-    } catch ( ChannelException e ) {
+    } catch (ChannelException e) {
       //success
       tx.rollback();
     } finally {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
index d4ba705..68aa117 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelConcurrency.java
@@ -17,15 +17,6 @@
  */
 package org.apache.flume.channel;
 
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -37,6 +28,15 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class TestMemoryChannelConcurrency {
 
   private CyclicBarrier barrier;
@@ -120,10 +120,10 @@ public class TestMemoryChannelConcurrency {
   }
 
   /**
-   * Works with a startgate/endgate latches to make sure all threads run at 
the same time. Threads randomly
-   * choose to commit or rollback random numbers of actions, tagging them with 
the thread no.
-   * The correctness check is made by recording committed entries into a map, 
and verifying the count
-   * after the endgate
+   * Works with a startgate/endgate latches to make sure all threads run at 
the same time.
+   * Threads randomly choose to commit or rollback random numbers of actions, 
tagging them with the
+   * thread no. The correctness check is made by recording committed entries 
into a map, and
+   * verifying the count after the endgate.
    * Since nothing is taking the puts out, allow for a big capacity
    *
    * @throws InterruptedException
@@ -135,7 +135,8 @@ public class TestMemoryChannelConcurrency {
     context.put("keep-alive", "1");
     context.put("capacity", "5000"); // theoretical maximum of 100 threads * 
10 * 5
     // because we're just grabbing the whole lot in one commit
-    // normally a transactionCapacity significantly lower than the channel 
capacity would be recommended
+    // normally a transactionCapacity significantly lower than the channel 
capacity would be
+    // recommended
     context.put("transactionCapacity", "5000");
     Configurables.configure(channel, context);
     final ConcurrentHashMap<String, AtomicInteger> committedPuts =
@@ -158,17 +159,17 @@ public class TestMemoryChannelConcurrency {
           } catch (InterruptedException e1) {
             Thread.currentThread().interrupt();
           }
-          for(int j = 0; j < 10; j++) {
+          for (int j = 0; j < 10; j++) {
             int events = rng.nextInt(5) + 1;
             Transaction tx = channel.getTransaction();
             tx.begin();
-            for(int k = 0; k < events; k++) {
+            for (int k = 0; k < events; k++) {
               channel.put(EventBuilder.withBody(strtid.getBytes()));
             }
-            if(rng.nextBoolean()) {
+            if (rng.nextBoolean()) {
               tx.commit();
               AtomicInteger tcount = committedPuts.get(strtid);
-              if(tcount == null) {
+              if (tcount == null) {
                 committedPuts.put(strtid, new AtomicInteger(events));
               } else {
                 tcount.addAndGet(events);
@@ -186,7 +187,7 @@ public class TestMemoryChannelConcurrency {
     startGate.countDown();
     endGate.await();
 
-    if(committedPuts.isEmpty()) {
+    if (committedPuts.isEmpty()) {
       Assert.fail();
     }
 
@@ -194,17 +195,17 @@ public class TestMemoryChannelConcurrency {
     Transaction tx = channel.getTransaction();
     tx.begin();
     Event e;
-    while((e = channel.take()) != null) {
+    while ((e = channel.take()) != null) {
       String index = new String(e.getBody());
       AtomicInteger remain = committedPuts.get(index);
       int post = remain.decrementAndGet();
-      if(post == 0) {
+      if (post == 0) {
         committedPuts.remove(index);
       }
     }
     tx.commit();
     tx.close();
-    if(!committedPuts.isEmpty()) {
+    if (!committedPuts.isEmpty()) {
       Assert.fail();
     }
   }
@@ -216,10 +217,12 @@ public class TestMemoryChannelConcurrency {
     context.put("keep-alive", "1");
     context.put("capacity", "100"); // theoretical maximum of 100 threads * 10 
* 5
     // because we're just grabbing the whole lot in one commit
-    // normally a transactionCapacity significantly lower than the channel 
capacity would be recommended
+    // normally a transactionCapacity significantly lower than the channel 
capacity would be
+    // recommended
     context.put("transactionCapacity", "100");
     Configurables.configure(channel, context);
-    final ConcurrentHashMap<String, AtomicInteger> committedPuts = new 
ConcurrentHashMap<String, AtomicInteger>();
+    final ConcurrentHashMap<String, AtomicInteger> committedPuts =
+        new ConcurrentHashMap<String, AtomicInteger>();
     final ConcurrentHashMap<String, AtomicInteger> committedTakes =
         new ConcurrentHashMap<String, AtomicInteger>();
 
@@ -228,7 +231,7 @@ public class TestMemoryChannelConcurrency {
     final CountDownLatch endGate = new CountDownLatch(threadCount);
 
     // start a sink and source for each
-    for (int i = 0; i < threadCount/2; i++) {
+    for (int i = 0; i < threadCount / 2; i++) {
       Thread t = new Thread() {
         @Override
         public void run() {
@@ -241,23 +244,23 @@ public class TestMemoryChannelConcurrency {
           } catch (InterruptedException e1) {
             Thread.currentThread().interrupt();
           }
-          for(int j = 0; j < 10; j++) {
+          for (int j = 0; j < 10; j++) {
             int events = rng.nextInt(5) + 1;
             Transaction tx = channel.getTransaction();
             tx.begin();
-            for(int k = 0; k < events; k++) {
+            for (int k = 0; k < events; k++) {
               channel.put(EventBuilder.withBody(strtid.getBytes()));
             }
-            if(rng.nextBoolean()) {
+            if (rng.nextBoolean()) {
               try {
                 tx.commit();
                 AtomicInteger tcount = committedPuts.get(strtid);
-                if(tcount == null) {
+                if (tcount == null) {
                   committedPuts.put(strtid, new AtomicInteger(events));
                 } else {
                   tcount.addAndGet(events);
                 }
-              } catch(ChannelException e) {
+              } catch (ChannelException e) {
                 System.out.print("puts commit failed");
                 tx.rollback();
               }
@@ -282,25 +285,25 @@ public class TestMemoryChannelConcurrency {
           } catch (InterruptedException e1) {
             Thread.currentThread().interrupt();
           }
-          for(int j = 0; j < 10; j++) {
+          for (int j = 0; j < 10; j++) {
             int events = rng.nextInt(5) + 1;
             Transaction tx = channel.getTransaction();
             tx.begin();
             Event[] taken = new Event[events];
             int k;
-            for(k = 0; k < events; k++) {
+            for (k = 0; k < events; k++) {
               taken[k] = channel.take();
-              if(taken[k] == null) break;
+              if (taken[k] == null) break;
             }
-            if(rng.nextBoolean()) {
+            if (rng.nextBoolean()) {
               try {
                 tx.commit();
-                for(Event e : taken) {
-                  if(e == null) break;
+                for (Event e : taken) {
+                  if (e == null) break;
                   String index = new String(e.getBody());
-                  synchronized(takeMapLock) {
+                  synchronized (takeMapLock) {
                     AtomicInteger remain = committedTakes.get(index);
-                    if(remain == null) {
+                    if (remain == null) {
                       committedTakes.put(index, new AtomicInteger(1));
                     } else {
                       remain.incrementAndGet();
@@ -323,7 +326,7 @@ public class TestMemoryChannelConcurrency {
       t.start();
     }
     startGate.countDown();
-    if(!endGate.await(20, TimeUnit.SECONDS)) {
+    if (!endGate.await(20, TimeUnit.SECONDS)) {
       Assert.fail("Not all threads ended succesfully");
     }
 
@@ -333,11 +336,11 @@ public class TestMemoryChannelConcurrency {
     Event e;
     // first pull out what's left in the channel and remove it from the
     // committed map
-    while((e = channel.take()) != null) {
+    while ((e = channel.take()) != null) {
       String index = new String(e.getBody());
       AtomicInteger remain = committedPuts.get(index);
       int post = remain.decrementAndGet();
-      if(post == 0) {
+      if (post == 0) {
         committedPuts.remove(index);
       }
     }
@@ -345,14 +348,19 @@ public class TestMemoryChannelConcurrency {
     tx.close();
 
     // now just check the committed puts match the committed takes
-    for(Entry<String, AtomicInteger> takes : committedTakes.entrySet()) {
+    for (Entry<String, AtomicInteger> takes : committedTakes.entrySet()) {
       AtomicInteger count = committedPuts.get(takes.getKey());
-      if(count == null)
+      if (count == null) {
         Assert.fail("Putted data doesn't exist");
-      if(count.get() != takes.getValue().get())
-        Assert.fail(String.format("Mismatched put and take counts expected %d 
had %d", count.get(), takes.getValue().get()));
+      }
+      if (count.get() != takes.getValue().get()) {
+        Assert.fail(String.format("Mismatched put and take counts expected %d 
had %d",
+                                  count.get(), takes.getValue().get()));
+      }
       committedPuts.remove(takes.getKey());
     }
-    if(!committedPuts.isEmpty()) Assert.fail("Puts still has entries 
remaining");
+    if (!committedPuts.isEmpty()) {
+      Assert.fail("Puts still has entries remaining");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
index b8e00d8..55b81ee 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/channel/TestMemoryChannelTransaction.java
@@ -43,7 +43,8 @@ public class TestMemoryChannelTransaction {
   @Test
   public void testCommit() throws InterruptedException, EventDeliveryException 
{
 
-    Event event, event2;
+    Event event;
+    Event event2;
     Context context = new Context();
     int putCounter = 0;
 
@@ -85,7 +86,8 @@ public class TestMemoryChannelTransaction {
   public void testRollBack() throws InterruptedException,
       EventDeliveryException {
 
-    Event event, event2;
+    Event event;
+    Event event2;
     Context context = new Context();
     int putCounter = 0;
 
@@ -158,7 +160,8 @@ public class TestMemoryChannelTransaction {
   public void testReEntTxn() throws InterruptedException,
       EventDeliveryException {
 
-    Event event, event2;
+    Event event;
+    Event event2;
     Context context = new Context();
     int putCounter = 0;
 
@@ -199,7 +202,8 @@ public class TestMemoryChannelTransaction {
   @Test
   public void testReEntTxnRollBack() throws InterruptedException,
       EventDeliveryException {
-    Event event, event2;
+    Event event;
+    Event event2;
     Context context = new Context();
     int putCounter = 0;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 4e90054..fdc3ce9 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -37,14 +37,25 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.*;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
 
 public class TestReliableSpoolingFileEventReader {
 
-  private static final Logger logger = LoggerFactory.getLogger
-      (TestReliableSpoolingFileEventReader.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestReliableSpoolingFileEventReader.class);
 
   private static final File WORK_DIR = new File("target/test/work/" +
       TestReliableSpoolingFileEventReader.class.getSimpleName());
@@ -57,7 +68,7 @@ public class TestReliableSpoolingFileEventReader {
 
     // write out a few files
     for (int i = 0; i < 4; i++) {
-      File fileName = new File(WORK_DIR, "file"+i);
+      File fileName = new File(WORK_DIR, "file" + i);
       StringBuilder sb = new StringBuilder();
 
       // write as many lines as the index of the file
@@ -102,11 +113,12 @@ public class TestReliableSpoolingFileEventReader {
 
   @Test
   public void testIgnorePattern() throws IOException {
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
-        .spoolDirectory(WORK_DIR)
-        .ignorePattern("^file2$")
-        .deletePolicy(DeletePolicy.IMMEDIATE.toString())
-        .build();
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder()
+            .spoolDirectory(WORK_DIR)
+            .ignorePattern("^file2$")
+            .deletePolicy(DeletePolicy.IMMEDIATE.toString())
+            .build();
 
     List<File> before = listFiles(WORK_DIR);
     Assert.assertEquals("Expected 5, not: " + before, 5, before.size());
@@ -128,8 +140,9 @@ public class TestReliableSpoolingFileEventReader {
 
   @Test
   public void testRepeatedCallsWithCommitAlways() throws IOException {
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
-        .spoolDirectory(WORK_DIR).build();
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     .build();
 
     final int expectedLines = 0 + 1 + 2 + 3 + 1;
     int seenLines = 0;
@@ -148,8 +161,10 @@ public class TestReliableSpoolingFileEventReader {
         SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
     File trackerDir = new File(WORK_DIR, trackerDirPath);
 
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
-        .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.trackerDirPath(trackerDirPath)
+                                                     .build();
 
     final int expectedLines = 0 + 1 + 2 + 3 + 1;
     int seenLines = 0;
@@ -173,10 +188,10 @@ public class TestReliableSpoolingFileEventReader {
 
   @Test
   public void testFileDeletion() throws IOException {
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
-        .spoolDirectory(WORK_DIR)
-        .deletePolicy(DeletePolicy.IMMEDIATE.name())
-        .build();
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.deletePolicy(DeletePolicy.IMMEDIATE.name())
+                                                     .build();
 
     List<File> before = listFiles(WORK_DIR);
     Assert.assertEquals("Expected 5, not: " + before, 5, before.size());
@@ -197,29 +212,25 @@ public class TestReliableSpoolingFileEventReader {
 
   @Test(expected = NullPointerException.class)
   public void testNullConsumeOrder() throws IOException {
-    new ReliableSpoolingFileEventReader.Builder()
-    .spoolDirectory(WORK_DIR)
-    .consumeOrder(null)
-    .build();
+    new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                 .consumeOrder(null)
+                                                 .build();
   }
   
   @Test
   public void testConsumeFileRandomly() throws IOException {
-    ReliableEventReader reader
-      = new ReliableSpoolingFileEventReader.Builder()
-    .spoolDirectory(WORK_DIR)
-    .consumeOrder(ConsumeOrder.RANDOM)
-    .build();
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.consumeOrder(ConsumeOrder.RANDOM)
+                                                     .build();
     File fileName = new File(WORK_DIR, "new-file");
-    FileUtils.write(fileName,
-      "New file created in the end. Shoud be read randomly.\n");
+    FileUtils.write(fileName, "New file created in the end. Shoud be read 
randomly.\n");
     Set<String> actual = Sets.newHashSet();
     readEventsForFilesInDir(WORK_DIR, reader, actual);
     Set<String> expected = Sets.newHashSet();
     createExpectedFromFilesInSetup(expected);
     expected.add("");
-    expected.add(
-      "New file created in the end. Shoud be read randomly.");
+    expected.add("New file created in the end. Shoud be read randomly.");
     Assert.assertEquals(expected, actual);    
   }
 
@@ -229,54 +240,46 @@ public class TestReliableSpoolingFileEventReader {
     if (SystemUtils.IS_OS_WINDOWS) {
       return;
     }
-    final ReliableEventReader reader
-      = new ReliableSpoolingFileEventReader.Builder()
-      .spoolDirectory(WORK_DIR)
-      .consumeOrder(ConsumeOrder.RANDOM)
-      .build();
+    final ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.consumeOrder(ConsumeOrder.RANDOM)
+                                                     .build();
     File fileName = new File(WORK_DIR, "new-file");
-    FileUtils.write(fileName,
-      "New file created in the end. Shoud be read randomly.\n");
+    FileUtils.write(fileName, "New file created in the end. Shoud be read 
randomly.\n");
     Set<String> expected = Sets.newHashSet();
     int totalFiles = WORK_DIR.listFiles().length;
     final Set<String> actual = Sets.newHashSet();
     ExecutorService executor = Executors.newSingleThreadExecutor();
     final Semaphore semaphore1 = new Semaphore(0);
     final Semaphore semaphore2 = new Semaphore(0);
-    Future<Void> wait = executor.submit(
-      new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore1, 
semaphore2);
-          return null;
-        }
+    Future<Void> wait = executor.submit(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore1, 
semaphore2);
+        return null;
       }
-    );
+    });
     semaphore1.acquire();
     File finalFile = new File(WORK_DIR, "t-file");
     FileUtils.write(finalFile, "Last file");
     semaphore2.release();
     wait.get();
-    int listFilesCount = ((ReliableSpoolingFileEventReader)reader)
-      .getListFilesCount();
+    int listFilesCount = 
((ReliableSpoolingFileEventReader)reader).getListFilesCount();
     finalFile.delete();
     createExpectedFromFilesInSetup(expected);
     expected.add("");
-    expected.add(
-      "New file created in the end. Shoud be read randomly.");
+    expected.add("New file created in the end. Shoud be read randomly.");
     expected.add("Last file");
     Assert.assertTrue(listFilesCount < (totalFiles + 2));
     Assert.assertEquals(expected, actual);
   }
 
-
   @Test
   public void testConsumeFileOldest() throws IOException, InterruptedException 
{
-    ReliableEventReader reader
-      = new ReliableSpoolingFileEventReader.Builder()
-      .spoolDirectory(WORK_DIR)
-      .consumeOrder(ConsumeOrder.OLDEST)
-      .build();
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.consumeOrder(ConsumeOrder.OLDEST)
+                                                     .build();
     File file1 = new File(WORK_DIR, "new-file1");   
     File file2 = new File(WORK_DIR, "new-file2");    
     File file3 = new File(WORK_DIR, "new-file3");
@@ -299,13 +302,11 @@ public class TestReliableSpoolingFileEventReader {
   }
   
   @Test
-  public void testConsumeFileYoungest()
-    throws IOException, InterruptedException {
-    ReliableEventReader reader
-      = new ReliableSpoolingFileEventReader.Builder()
-      .spoolDirectory(WORK_DIR)
-      .consumeOrder(ConsumeOrder.YOUNGEST)
-      .build();
+  public void testConsumeFileYoungest() throws IOException, 
InterruptedException {
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.consumeOrder(ConsumeOrder.YOUNGEST)
+                                                     .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
     File file3 = new File(WORK_DIR, "new-file3");
@@ -332,12 +333,11 @@ public class TestReliableSpoolingFileEventReader {
 
   @Test
   public void testConsumeFileOldestWithLexicographicalComparision()
-    throws IOException, InterruptedException {
-    ReliableEventReader reader
-      = new ReliableSpoolingFileEventReader.Builder()
-      .spoolDirectory(WORK_DIR)
-      .consumeOrder(ConsumeOrder.OLDEST)
-      .build();
+      throws IOException, InterruptedException {
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.consumeOrder(ConsumeOrder.OLDEST)
+                                                     .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
     File file3 = new File(WORK_DIR, "new-file3");
@@ -362,12 +362,11 @@ public class TestReliableSpoolingFileEventReader {
 
   @Test
   public void testConsumeFileYoungestWithLexicographicalComparision()
-    throws IOException, InterruptedException {
-    ReliableEventReader reader
-      = new ReliableSpoolingFileEventReader.Builder()
-      .spoolDirectory(WORK_DIR)
-      .consumeOrder(ConsumeOrder.YOUNGEST)
-      .build();
+      throws IOException, InterruptedException {
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.consumeOrder(ConsumeOrder.YOUNGEST)
+                                                     .build();
     File file1 = new File(WORK_DIR, "new-file1");
     File file2 = new File(WORK_DIR, "new-file2");
     File file3 = new File(WORK_DIR, "new-file3");
@@ -393,6 +392,7 @@ public class TestReliableSpoolingFileEventReader {
   @Test public void testLargeNumberOfFilesOLDEST() throws IOException {    
     templateTestForLargeNumberOfFiles(ConsumeOrder.OLDEST, null, 1000);
   }
+
   @Test public void testLargeNumberOfFilesYOUNGEST() throws IOException {    
     templateTestForLargeNumberOfFiles(ConsumeOrder.YOUNGEST, new 
Comparator<Long>() {
 
@@ -402,6 +402,7 @@ public class TestReliableSpoolingFileEventReader {
       }
     }, 1000);
   }
+
   @Test public void testLargeNumberOfFilesRANDOM() throws IOException {    
     templateTestForLargeNumberOfFiles(ConsumeOrder.RANDOM, null, 1000);
   }
@@ -409,19 +410,21 @@ public class TestReliableSpoolingFileEventReader {
   @Test
   public void testZeroByteTrackerFile() throws IOException {
     String trackerDirPath =
-            SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_TRACKER_DIR;
     File trackerDir = new File(WORK_DIR, trackerDirPath);
-    if(!trackerDir.exists()) {
+    if (!trackerDir.exists()) {
       trackerDir.mkdir();
     }
     File trackerFile = new File(trackerDir, 
ReliableSpoolingFileEventReader.metaFileName);
-    if(trackerFile.exists()) {
+    if (trackerFile.exists()) {
       trackerFile.delete();
     }
     trackerFile.createNewFile();
 
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
-            .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();
+    ReliableEventReader reader =
+        new ReliableSpoolingFileEventReader.Builder().spoolDirectory(WORK_DIR)
+                                                     
.trackerDirPath(trackerDirPath)
+                                                     .build();
     final int expectedLines = 1;
     int seenLines = 0;
     List<Event> events = reader.readEvents(10);
@@ -434,18 +437,16 @@ public class TestReliableSpoolingFileEventReader {
     Assert.assertEquals(expectedLines, seenLines);
   }
 
-  private void templateTestForLargeNumberOfFiles(ConsumeOrder order, 
-      Comparator<Long> comparator,
-      int N) throws IOException {
+  private void templateTestForLargeNumberOfFiles(ConsumeOrder order, 
Comparator<Long> comparator,
+                                                 int N) throws IOException {
     File dir = null;
     try {
-      dir = new File(
-        "target/test/work/" + this.getClass().getSimpleName() +
-          "_large");
+      dir = new File("target/test/work/" + this.getClass().getSimpleName() + 
"_large");
       Files.createParentDirs(new File(dir, "dummy"));
-      ReliableEventReader reader
-        = new ReliableSpoolingFileEventReader.Builder()
-      .spoolDirectory(dir).consumeOrder(order).build();
+      ReliableEventReader reader =
+          new ReliableSpoolingFileEventReader.Builder().spoolDirectory(dir)
+                                                       .consumeOrder(order)
+                                                       .build();
       Map<Long, List<String>> expected;
       if (comparator == null) {
         expected = new TreeMap<Long, List<String>>();
@@ -476,16 +477,14 @@ public class TestReliableSpoolingFileEventReader {
         List<Event> events;
         events = reader.readEvents(10);
         for (Event e : events) {
-          if (order == ConsumeOrder.RANDOM) {            
+          if (order == ConsumeOrder.RANDOM) {
             Assert.assertTrue(expectedList.remove(new String(e.getBody())));
           } else {
-            Assert.assertEquals(
-              ((ArrayList<String>) expectedList).get(0),
-              new String(e.getBody()));
+            Assert.assertEquals(((ArrayList<String>) expectedList).get(0), new 
String(e.getBody()));
             ((ArrayList<String>) expectedList).remove(0);
           }
         }
-        reader.commit();        
+        reader.commit();
       }
     } finally {
       deleteDir(dir);
@@ -493,23 +492,24 @@ public class TestReliableSpoolingFileEventReader {
   }
 
   private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
-    Collection<String> actual) throws IOException {
+                                       Collection<String> actual) throws 
IOException {
     readEventsForFilesInDir(dir, reader, actual, null, null);
   }
     
   /* Read events, one for each file in the given directory. */
-  private void readEventsForFilesInDir(File dir, ReliableEventReader reader, 
-      Collection<String> actual, Semaphore semaphore1, Semaphore semaphore2) 
throws IOException {
+  private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
+                                       Collection<String> actual, Semaphore 
semaphore1,
+                                       Semaphore semaphore2) throws 
IOException {
     List<Event> events;
     boolean executed = false;
-    for (int i=0; i < listFiles(dir).size(); i++) {
+    for (int i = 0; i < listFiles(dir).size(); i++) {
       events = reader.readEvents(10);
       for (Event e : events) {
         actual.add(new String(e.getBody()));
       }
       reader.commit();
       try {
-        if(!executed) {
+        if (!executed) {
           executed = true;
           if (semaphore1 != null) {
             semaphore1.release();
@@ -533,8 +533,7 @@ public class TestReliableSpoolingFileEventReader {
   }
   
   private static List<File> listFiles(File dir) {
-    List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter
-        () {
+    List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter() {
       @Override
       public boolean accept(File pathname) {
         return !pathname.isDirectory();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
 
b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
index 21b972b..b1b828a 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java
@@ -18,6 +18,12 @@
 
 package org.apache.flume.formatter.output;
 
+import org.apache.flume.Clock;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
 
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
@@ -26,21 +32,15 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.TimeZone;
 
-import org.apache.flume.Clock;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestBucketPath {
   Calendar cal;
   Map<String, String> headers;
+
   @Before
-  public void setUp(){
+  public void setUp() {
     cal = Calendar.getInstance();
     cal.set(2012, 5, 23, 13, 46, 33);
     cal.set(Calendar.MILLISECOND, 234);
@@ -49,7 +49,7 @@ public class TestBucketPath {
   }
 
   @Test
-  public void testDateFormatCache(){
+  public void testDateFormatCache() {
     TimeZone utcTimeZone = TimeZone.getTimeZone("UTC");
     String test = "%c";
     BucketPath.escapeString(
@@ -60,7 +60,7 @@ public class TestBucketPath {
     SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
     Date d = new Date(cal.getTimeInMillis());
     String expectedString = format.format(d);
-    System.out.println("Expected String: "+ expectedString);
+    System.out.println("Expected String: " + expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
 
@@ -76,7 +76,7 @@ public class TestBucketPath {
     SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
     Date d = new Date(cal2.getTimeInMillis());
     String expectedString = format.format(d);
-    System.out.println("Expected String: "+ expectedString);
+    System.out.println("Expected String: " + expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
 
@@ -89,8 +89,8 @@ public class TestBucketPath {
     Calendar cal2 = Calendar.getInstance();
     cal2.set(2012, 5, 23, 13, 45, 0);
     cal2.set(Calendar.MILLISECOND, 0);
-    String expectedString = String.valueOf(cal2.getTimeInMillis()/1000);
-    System.out.println("Expected String: "+ expectedString);
+    String expectedString = String.valueOf(cal2.getTimeInMillis() / 1000);
+    System.out.println("Expected String: " + expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
 
@@ -103,13 +103,13 @@ public class TestBucketPath {
     Calendar cal2 = Calendar.getInstance();
     cal2.set(2012, 5, 23, 13, 46, 30);
     cal2.set(Calendar.MILLISECOND, 0);
-    String expectedString = String.valueOf(cal2.getTimeInMillis()/1000);
-    System.out.println("Expected String: "+ expectedString);
+    String expectedString = String.valueOf(cal2.getTimeInMillis() / 1000);
+    System.out.println("Expected String: " + expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
 
   @Test
-  public void testNoRounding(){
+  public void testNoRounding() {
     String test = "%c";
     String escapedString = BucketPath.escapeString(
         test, headers, false, Calendar.HOUR_OF_DAY, 12);
@@ -117,19 +117,19 @@ public class TestBucketPath {
     SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy");
     Date d = new Date(cal.getTimeInMillis());
     String expectedString = format.format(d);
-    System.out.println("Expected String: "+ expectedString);
+    System.out.println("Expected String: " + expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
 
 
   @Test
-  public void testNoPadding(){
+  public void testNoPadding() {
     Calendar calender;
     Map<String, String> calender_timestamp;
     calender = Calendar.getInstance();
-    
+
     //Check single digit dates
-    calender.set(2014, (5-1), 3, 13, 46, 33);
+    calender.set(2014, (5 - 1), 3, 13, 46, 33);
     calender_timestamp = new HashMap<String, String>();
     calender_timestamp.put("timestamp", 
String.valueOf(calender.getTimeInMillis()));
     SimpleDateFormat format = new SimpleDateFormat("M-d");
@@ -141,19 +141,19 @@ public class TestBucketPath {
     String expectedString = format.format(d);
     
     //Check two digit dates
-    calender.set(2014, (11-1), 13, 13, 46, 33);
+    calender.set(2014, (11 - 1), 13, 13, 46, 33);
     calender_timestamp.put("timestamp", 
String.valueOf(calender.getTimeInMillis()));
-    escapedString +=  " " +  BucketPath.escapeString(
+    escapedString += " " + BucketPath.escapeString(
         test, calender_timestamp, false, Calendar.HOUR_OF_DAY, 12);
     System.out.println("Escaped String: " + escapedString);
     d = new Date(calender.getTimeInMillis());
-    expectedString +=  " " + format.format(d);
-    System.out.println("Expected String: "+ expectedString);
+    expectedString += " " + format.format(d);
+    System.out.println("Expected String: " + expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
 
   @Test
-  public void testDateFormatTimeZone(){
+  public void testDateFormatTimeZone() {
     TimeZone utcTimeZone = TimeZone.getTimeZone("UTC");
     String test = "%c";
     String escapedString = BucketPath.escapeString(
@@ -163,7 +163,7 @@ public class TestBucketPath {
     format.setTimeZone(utcTimeZone);
     Date d = new Date(cal.getTimeInMillis());
     String expectedString = format.format(d);
-    System.out.println("Expected String: "+ expectedString);
+    System.out.println("Expected String: " + expectedString);
     Assert.assertEquals(expectedString, escapedString);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
index b1f637f..7db535e 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/TestMonitoredCounterGroup.java
@@ -18,22 +18,15 @@
  */
 package org.apache.flume.instrumentation;
 
-import java.lang.management.ManagementFactory;
-import java.util.Random;
-
-import javax.management.AttributeNotFoundException;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanException;
-import javax.management.MBeanInfo;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.management.ReflectionException;
-
 import junit.framework.Assert;
-
 import org.junit.Before;
 import org.junit.Test;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.Random;
+
 public class TestMonitoredCounterGroup {
 
   private static final int MAX_BOUNDS = 1000;
@@ -61,7 +54,6 @@ public class TestMonitoredCounterGroup {
   private static final String SRC_ATTR_APPEND_BATCH_ACCEPTED_COUNT =
       "AppendBatchAcceptedCount";
 
-
   private static final String CH_ATTR_CHANNEL_SIZE = "ChannelSize";
   private static final String CH_ATTR_EVENT_PUT_ATTEMPT =
       "EventPutAttemptCount";
@@ -122,28 +114,28 @@ public class TestMonitoredCounterGroup {
     int eventDrainAttempt = random.nextInt(MAX_BOUNDS);
     int eventDrainSuccess = random.nextInt(MAX_BOUNDS);
 
-    for (int i = 0; i<connCreated; i++) {
+    for (int i = 0; i < connCreated; i++) {
       skc.incrementConnectionCreatedCount();
     }
-    for (int i = 0; i<connClosed; i++) {
+    for (int i = 0; i < connClosed; i++) {
       skc.incrementConnectionClosedCount();
     }
-    for (int i = 0; i<connFailed; i++) {
+    for (int i = 0; i < connFailed; i++) {
       skc.incrementConnectionFailedCount();
     }
-    for (int i = 0; i<batchEmpty; i++) {
+    for (int i = 0; i < batchEmpty; i++) {
       skc.incrementBatchEmptyCount();
     }
-    for (int i = 0; i<batchUnderflow; i++) {
+    for (int i = 0; i < batchUnderflow; i++) {
       skc.incrementBatchUnderflowCount();
     }
-    for (int i = 0; i<batchComplete; i++) {
+    for (int i = 0; i < batchComplete; i++) {
       skc.incrementBatchCompleteCount();
     }
-    for (int i = 0; i<eventDrainAttempt; i++) {
+    for (int i = 0; i < eventDrainAttempt; i++) {
       skc.incrementEventDrainAttemptCount();
     }
-    for (int i = 0; i<eventDrainSuccess; i++) {
+    for (int i = 0; i < eventDrainSuccess; i++) {
       skc.incrementEventDrainSuccessCount();
     }
 
@@ -204,10 +196,10 @@ public class TestMonitoredCounterGroup {
     int numEventTakeSuccess = random.nextInt(MAX_BOUNDS);
 
     chc.setChannelSize(numChannelSize);
-    for (int i = 0; i<numEventPutAttempt; i++) {
+    for (int i = 0; i < numEventPutAttempt; i++) {
       chc.incrementEventPutAttemptCount();
     }
-    for (int i = 0; i<numEventTakeAttempt; i++) {
+    for (int i = 0; i < numEventTakeAttempt; i++) {
       chc.incrementEventTakeAttemptCount();
     }
     chc.addToEventPutSuccessCount(numEventPutSuccess);
@@ -264,16 +256,16 @@ public class TestMonitoredCounterGroup {
 
     srcc.addToEventReceivedCount(numEventReceived);
     srcc.addToEventAcceptedCount(numEventAccepted);
-    for (int i = 0; i<numAppendReceived; i++) {
+    for (int i = 0; i < numAppendReceived; i++) {
       srcc.incrementAppendReceivedCount();
     }
-    for (int i = 0; i<numAppendAccepted; i++) {
+    for (int i = 0; i < numAppendAccepted; i++) {
       srcc.incrementAppendAcceptedCount();
     }
-    for (int i = 0; i<numAppendBatchReceived; i++) {
+    for (int i = 0; i < numAppendBatchReceived; i++) {
       srcc.incrementAppendBatchReceivedCount();
     }
-    for (int i = 0; i<numAppendBatchAccepted; i++) {
+    for (int i = 0; i < numAppendBatchAccepted; i++) {
       srcc.incrementAppendBatchAcceptedCount();
     }
 
@@ -302,11 +294,11 @@ public class TestMonitoredCounterGroup {
     int numEventReceived2 = random.nextInt(MAX_BOUNDS);
     int numEventAccepted2 = random.nextInt(MAX_BOUNDS);
 
-    for (int i = 0; i<numEventReceived2; i++) {
+    for (int i = 0; i < numEventReceived2; i++) {
       srcc.incrementEventReceivedCount();
     }
 
-    for (int i = 0; i<numEventAccepted2; i++) {
+    for (int i = 0; i < numEventAccepted2; i++) {
       srcc.incrementEventAcceptedCount();
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
index eb2d02d..09d419f 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/http/TestHTTPMetricsServer.java
@@ -20,12 +20,6 @@ package org.apache.flume.instrumentation.http;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.lang.reflect.Type;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Map;
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Transaction;
@@ -39,6 +33,12 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import javax.servlet.http.HttpServletResponse;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.lang.reflect.Type;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
 
 /**
  *
@@ -47,9 +47,7 @@ public class TestHTTPMetricsServer {
 
   Channel memChannel = new MemoryChannel();
   Channel pmemChannel = new PseudoTxnMemoryChannel();
-  Type mapType =
-          new TypeToken<Map<String, Map<String, String>>>() {
-          }.getType();
+  Type mapType = new TypeToken<Map<String, Map<String, String>>>() 
{}.getType();
   Gson gson = new Gson();
 
   @Test
@@ -99,7 +97,7 @@ public class TestHTTPMetricsServer {
   private void testWithPort(int port) throws Exception {
     MonitorService srv = new HTTPMetricsServer();
     Context context = new Context();
-    if(port > 1024){
+    if (port > 1024) {
       context.put(HTTPMetricsServer.CONFIG_PORT, String.valueOf(port));
     } else {
       port = HTTPMetricsServer.DEFAULT_PORT;
@@ -139,8 +137,7 @@ public class TestHTTPMetricsServer {
     doTestForbiddenMethods(4432,"OPTIONS");
   }
 
-  public void doTestForbiddenMethods(int port, String method)
-    throws Exception {
+  public void doTestForbiddenMethods(int port, String method) throws Exception 
{
     MonitorService srv = new HTTPMetricsServer();
     Context context = new Context();
     if (port > 1024) {
@@ -154,8 +151,7 @@ public class TestHTTPMetricsServer {
     URL url = new URL("http://0.0.0.0:"; + String.valueOf(port) + "/metrics");
     HttpURLConnection conn = (HttpURLConnection) url.openConnection();
     conn.setRequestMethod(method);
-    Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN,
-      conn.getResponseCode());
+    Assert.assertEquals(HttpServletResponse.SC_FORBIDDEN, 
conn.getResponseCode());
     srv.stop();
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
index 4a71265..6d64c53 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/instrumentation/kafka/KafkaSourceCounterTest.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -23,41 +23,41 @@ import org.junit.Test;
 
 public class KafkaSourceCounterTest {
 
-    KafkaSourceCounter counter;
-
-    @Before
-    public void setUp() throws Exception {
-        counter = new KafkaSourceCounter("test");
-    }
-
-    @Test
-    public void testAddToKafkaEventGetTimer() throws Exception {
-        Assert.assertEquals(1L, counter.addToKafkaEventGetTimer(1L));
-    }
-
-    @Test
-    public void testAddToKafkaCommitTimer() throws Exception {
-        Assert.assertEquals(1L, counter.addToKafkaCommitTimer(1L));
-    }
-
-    @Test
-    public void testIncrementKafkaEmptyCount() throws Exception {
-        Assert.assertEquals(1L, counter.incrementKafkaEmptyCount());
-    }
-
-    @Test
-    public void testGetKafkaCommitTimer() throws Exception {
-        Assert.assertEquals(0, counter.getKafkaCommitTimer());
-    }
-
-    @Test
-    public void testGetKafkaEventGetTimer() throws Exception {
-        Assert.assertEquals(0, counter.getKafkaEventGetTimer());
-    }
-
-    @Test
-    public void testGetKafkaEmptyCount() throws Exception {
-        Assert.assertEquals(0, counter.getKafkaEmptyCount());
-    }
+  KafkaSourceCounter counter;
+
+  @Before
+  public void setUp() throws Exception {
+    counter = new KafkaSourceCounter("test");
+  }
+
+  @Test
+  public void testAddToKafkaEventGetTimer() throws Exception {
+    Assert.assertEquals(1L, counter.addToKafkaEventGetTimer(1L));
+  }
+
+  @Test
+  public void testAddToKafkaCommitTimer() throws Exception {
+    Assert.assertEquals(1L, counter.addToKafkaCommitTimer(1L));
+  }
+
+  @Test
+  public void testIncrementKafkaEmptyCount() throws Exception {
+    Assert.assertEquals(1L, counter.incrementKafkaEmptyCount());
+  }
+
+  @Test
+  public void testGetKafkaCommitTimer() throws Exception {
+    Assert.assertEquals(0, counter.getKafkaCommitTimer());
+  }
+
+  @Test
+  public void testGetKafkaEventGetTimer() throws Exception {
+    Assert.assertEquals(0, counter.getKafkaEventGetTimer());
+  }
+
+  @Test
+  public void testGetKafkaEmptyCount() throws Exception {
+    Assert.assertEquals(0, counter.getKafkaEmptyCount());
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
index ac46131..dd42079 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorMillisSerializer.java
@@ -18,7 +18,6 @@
 package org.apache.flume.interceptor;
 
 import junit.framework.Assert;
-
 import org.apache.flume.Context;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -29,7 +28,8 @@ public class TestRegexExtractorInterceptorMillisSerializer {
   @Test
   public void shouldRequirePatternInConfiguration() {
     try {
-      RegexExtractorInterceptorMillisSerializer fixture = new 
RegexExtractorInterceptorMillisSerializer();
+      RegexExtractorInterceptorMillisSerializer fixture =
+          new RegexExtractorInterceptorMillisSerializer();
       fixture.configure(new Context());
       Assert.fail();
     } catch (IllegalArgumentException ex) {
@@ -40,7 +40,8 @@ public class TestRegexExtractorInterceptorMillisSerializer {
   @Test
   public void shouldRequireValidPatternInConfiguration() {
     try {
-      RegexExtractorInterceptorMillisSerializer fixture = new 
RegexExtractorInterceptorMillisSerializer();
+      RegexExtractorInterceptorMillisSerializer fixture =
+          new RegexExtractorInterceptorMillisSerializer();
       Context context = new Context();
       context.put("pattern", "ABCDEFG");
       fixture.configure(context);
@@ -52,7 +53,8 @@ public class TestRegexExtractorInterceptorMillisSerializer {
 
   @Test
   public void shouldReturnMillisFromPattern() {
-    RegexExtractorInterceptorMillisSerializer fixture = new 
RegexExtractorInterceptorMillisSerializer();
+    RegexExtractorInterceptorMillisSerializer fixture =
+        new RegexExtractorInterceptorMillisSerializer();
     Context context = new Context();
     String pattern = "yyyy-MM-dd HH:mm:ss";
     context.put("pattern", pattern);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
index 569c274..33003e6 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestRegexExtractorInterceptorPassThroughSerializer.java
@@ -26,7 +26,8 @@ public class 
TestRegexExtractorInterceptorPassThroughSerializer {
 
   @Test
   public void shouldReturnSameValue() {
-    RegexExtractorInterceptorPassThroughSerializer fixture = new 
RegexExtractorInterceptorPassThroughSerializer();
+    RegexExtractorInterceptorPassThroughSerializer fixture =
+        new RegexExtractorInterceptorPassThroughSerializer();
     fixture.configure(new Context());
     String input = "testing (1,2,3,4)";
     Assert.assertEquals(input, fixture.serialize(input));

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
index 2ab15f5..616b86b 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/interceptor/TestSearchAndReplaceInterceptor.java
@@ -35,7 +35,7 @@ public class TestSearchAndReplaceInterceptor {
 
   private void testSearchReplace(Context context, String input, String output)
       throws Exception {
-   Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
+    Interceptor.Builder builder = InterceptorBuilderFactory.newInstance(
         InterceptorType.SEARCH_REPLACE.toString());
     builder.configure(context);
     Interceptor interceptor = builder.build();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
index 896eced..05af3b1 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/SyslogAvroEventSerializer.java
@@ -231,20 +231,45 @@ public class SyslogAvroEventSerializer
     private String hostname = "";
     private String message = "";
 
-    public void setFacility(int f) { facility = f; }
-    public int getFacility() { return facility; }
+    public void setFacility(int f) {
+      facility = f;
+    }
+
+    public int getFacility() {
+      return facility;
+    }
+
+    public void setSeverity(int s) {
+      severity = s;
+    }
 
-    public void setSeverity(int s) { severity = s; }
-    public int getSeverity() { return severity; }
+    public int getSeverity() {
+      return severity;
+    }
 
-    public void setTimestamp(long t) { timestamp = t; }
-    public long getTimestamp() { return timestamp; }
+    public void setTimestamp(long t) {
+      timestamp = t;
+    }
 
-    public void setHostname(String h) { hostname = h; }
-    public String getHostname() { return hostname; }
+    public long getTimestamp() {
+      return timestamp;
+    }
 
-    public void setMessage(String m) { message = m; }
-    public String getMessage() { return message; }
+    public void setHostname(String h) {
+      hostname = h;
+    }
+
+    public String getHostname() {
+      return hostname;
+    }
+
+    public void setMessage(String m) {
+      message = m;
+    }
+
+    public String getMessage() {
+      return message;
+    }
 
     @Override
     public String toString() {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
index 6f9ddc2..b95433f 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestAvroEventDeserializer.java
@@ -49,6 +49,7 @@ public class TestAvroEventDeserializer {
       LoggerFactory.getLogger(TestAvroEventDeserializer.class);
 
   private static final Schema schema;
+
   static {
     schema = Schema.createRecord("MyRecord", "", "org.apache.flume",  false);
     Schema.Field field = new Schema.Field("foo",

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java
index e52affb..0c76cc9 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestDurablePositionTracker.java
@@ -31,8 +31,7 @@ import java.net.URL;
 
 public class TestDurablePositionTracker {
 
-  private static final Logger logger = LoggerFactory.getLogger
-      (TestDurablePositionTracker.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(TestDurablePositionTracker.class);
 
   @Test
   public void testBasicTracker() throws IOException {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java
index 3860b5e..ded3b13 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestFlumeEventAvroEventSerializer.java
@@ -18,14 +18,7 @@
  */
 package org.apache.flume.serialization;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.charset.CharsetDecoder;
-
+import com.google.common.base.Charsets;
 import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
@@ -38,7 +31,13 @@ import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.CharsetDecoder;
 
 public class TestFlumeEventAvroEventSerializer {
 
@@ -77,17 +76,16 @@ public class TestFlumeEventAvroEventSerializer {
       throws FileNotFoundException, IOException {
     // Snappy currently broken on Mac in OpenJDK 7 per FLUME-2012
     Assume.assumeTrue(!"Mac OS X".equals(System.getProperty("os.name")) ||
-      !System.getProperty("java.version").startsWith("1.7."));
+                      !System.getProperty("java.version").startsWith("1.7."));
 
     createAvroFile(TESTFILE, "snappy");
     validateAvroFile(TESTFILE);
     FileUtils.forceDelete(TESTFILE);
   }
 
-  public void createAvroFile(File file, String codec)
-      throws FileNotFoundException, IOException {
+  public void createAvroFile(File file, String codec) throws 
FileNotFoundException, IOException {
 
-    if(file.exists()){
+    if (file.exists()) {
       FileUtils.forceDelete(file);
     }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
index 631bdfe..9f336eb 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestResettableFileInputStream.java
@@ -27,8 +27,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.junit.Assert.*;
-
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -39,15 +37,21 @@ import java.nio.charset.Charset;
 import java.nio.charset.MalformedInputException;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TestResettableFileInputStream {
 
   private static final boolean CLEANUP = true;
   private static final File WORK_DIR =
       new File("target/test/work").getAbsoluteFile();
-  private static final Logger logger = LoggerFactory.getLogger
-      (TestResettableFileInputStream.class);
+  private static final Logger logger =
+      LoggerFactory.getLogger(TestResettableFileInputStream.class);
 
-  private File file, meta;
+  private File file;
+  private File meta;
 
   @Before
   public void setup() throws Exception {
@@ -156,7 +160,8 @@ public class TestResettableFileInputStream {
   public void testUtf16BOMAndSurrogatePairRead() throws IOException {
     ByteArrayOutputStream out = new ByteArrayOutputStream();
     generateUtf16SurrogatePairSequence(out);
-    // buffer now contains 1 BOM and 2 chars (1 surrogate pair) and 6 bytes 
total (including 2-byte BOM)
+    // buffer now contains 1 BOM and 2 chars (1 surrogate pair) and 6 bytes 
total
+    // (including 2-byte BOM)
     Files.write(out.toByteArray(), file);
     ResettableInputStream in = initInputStream(8, Charsets.UTF_16, 
DecodeErrorPolicy.FAIL);
     String result = readLine(in, 2);
@@ -176,7 +181,8 @@ public class TestResettableFileInputStream {
     generateShiftJis2ByteSequence(out);
     // buffer now contains 8 chars and 10 bytes total
     Files.write(out.toByteArray(), file);
-    ResettableInputStream in = initInputStream(8, 
Charset.forName("Shift_JIS"), DecodeErrorPolicy.FAIL);
+    ResettableInputStream in = initInputStream(8, Charset.forName("Shift_JIS"),
+                                               DecodeErrorPolicy.FAIL);
     String result = readLine(in, 8);
     assertEquals("1234567\u4E9C\n", result);
   }
@@ -215,7 +221,7 @@ public class TestResettableFileInputStream {
     String javaVersionStr = System.getProperty("java.version");
     double javaVersion = Double.parseDouble(javaVersionStr.substring(0, 3));
 
-    if(javaVersion < 1.8) {
+    if (javaVersion < 1.8) {
       assertTrue(preJdk8ExpectedStr.replaceAll("X", 
"\ufffd").equals(sb.toString()));
     } else {
       assertTrue(expectedStr.replaceAll("X", "\ufffd").equals(sb.toString()));
@@ -508,8 +514,8 @@ public class TestResettableFileInputStream {
     return initInputStream(2048, Charsets.UTF_8, policy);
   }
 
-  private ResettableInputStream initInputStream(int bufferSize, Charset 
charset, DecodeErrorPolicy policy)
-      throws IOException {
+  private ResettableInputStream initInputStream(int bufferSize, Charset 
charset,
+                                                DecodeErrorPolicy policy) 
throws IOException {
     PositionTracker tracker = new DurablePositionTracker(meta, file.getPath());
     ResettableInputStream in = new ResettableFileInputStream(file, tracker,
         bufferSize, charset, policy);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java
index 7bd342a..ba9f4ab 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestSyslogAvroEventSerializer.java
@@ -75,7 +75,7 @@ public class TestSyslogAvroEventSerializer {
   public void test() throws FileNotFoundException, IOException {
     // Snappy currently broken on Mac in OpenJDK 7 per FLUME-2012
     Assume.assumeTrue(!"Mac OS X".equals(System.getProperty("os.name")) ||
-      !System.getProperty("java.version").startsWith("1.7."));
+                      !System.getProperty("java.version").startsWith("1.7."));
 
     //Schema schema = new Schema.Parser().parse(schemaFile);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
index 757a536..0f3f9ec 100644
--- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
+++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestAvroSink.java
@@ -19,28 +19,10 @@
 
 package org.apache.flume.sink;
 
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicLong;
-
 import com.google.common.base.Charsets;
-import java.io.FileInputStream;
-import java.security.KeyStore;
-import java.security.Security;
-import java.util.concurrent.Executors;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
 import org.apache.avro.AvroRemoteException;
 import org.apache.avro.ipc.NettyServer;
-import org.apache.avro.ipc.NettyTransceiver;
 import org.apache.avro.ipc.Server;
-import org.apache.avro.ipc.specific.SpecificRequestor;
 import org.apache.avro.ipc.specific.SpecificResponder;
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelSelector;
@@ -49,8 +31,8 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.Sink;
 import org.apache.flume.Transaction;
-import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.api.RpcClient;
+import org.apache.flume.channel.ChannelProcessor;
 import org.apache.flume.channel.MemoryChannel;
 import org.apache.flume.channel.ReplicatingChannelSelector;
 import org.apache.flume.conf.Configurables;
@@ -61,19 +43,30 @@ import org.apache.flume.source.AvroSource;
 import org.apache.flume.source.avro.AvroFlumeEvent;
 import org.apache.flume.source.avro.AvroSourceProtocol;
 import org.apache.flume.source.avro.Status;
-import org.jboss.netty.channel.ChannelException;
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
 import org.jboss.netty.handler.ssl.SslHandler;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.Charset;
+import java.security.KeyStore;
+import java.security.Security;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
 public class TestAvroSink {
 
   private static final Logger logger = LoggerFactory
@@ -90,7 +83,9 @@ public class TestAvroSink {
   }
 
   public void setUp(String compressionType, int compressionLevel) {
-    if (sink != null) { throw new RuntimeException("double setup");}
+    if (sink != null) {
+      throw new RuntimeException("double setup");
+    }
     sink = new AvroSink();
     channel = new MemoryChannel();
 
@@ -607,8 +602,9 @@ public class TestAvroSink {
   }
 
   @Test
-  public void testSslSinkWithNonTrustedCert() throws InterruptedException,
-      EventDeliveryException, InstantiationException, IllegalAccessException {
+  public void testSslSinkWithNonTrustedCert()
+      throws InterruptedException, EventDeliveryException, 
InstantiationException,
+             IllegalAccessException {
     setUp();
     Event event = EventBuilder.withBody("test event 1", Charsets.UTF_8);
     Server server = createSslServer(new MockAvroServer());
@@ -662,37 +658,38 @@ public class TestAvroSink {
   }
 
   @Test
-  public void testRequestWithNoCompression() throws InterruptedException, 
IOException, EventDeliveryException {
-
+  public void testRequestWithNoCompression()
+      throws InterruptedException, IOException, EventDeliveryException {
     doRequest(false, false, 6);
   }
 
   @Test
-  public void testRequestWithCompressionOnClientAndServerOnLevel0() throws 
InterruptedException, IOException, EventDeliveryException {
-
+  public void testRequestWithCompressionOnClientAndServerOnLevel0()
+      throws InterruptedException, IOException, EventDeliveryException {
     doRequest(true, true, 0);
   }
 
   @Test
-  public void testRequestWithCompressionOnClientAndServerOnLevel1() throws 
InterruptedException, IOException, EventDeliveryException {
-
+  public void testRequestWithCompressionOnClientAndServerOnLevel1()
+      throws InterruptedException, IOException, EventDeliveryException {
     doRequest(true, true, 1);
   }
 
   @Test
-  public void testRequestWithCompressionOnClientAndServerOnLevel6() throws 
InterruptedException, IOException, EventDeliveryException {
-
+  public void testRequestWithCompressionOnClientAndServerOnLevel6()
+      throws InterruptedException, IOException, EventDeliveryException {
     doRequest(true, true, 6);
   }
 
   @Test
-  public void testRequestWithCompressionOnClientAndServerOnLevel9() throws 
InterruptedException, IOException, EventDeliveryException {
-
+  public void testRequestWithCompressionOnClientAndServerOnLevel9()
+      throws InterruptedException, IOException, EventDeliveryException {
     doRequest(true, true, 9);
   }
 
-  private void doRequest(boolean serverEnableCompression, boolean 
clientEnableCompression, int compressionLevel) throws InterruptedException, 
IOException, EventDeliveryException {
-
+  private void doRequest(boolean serverEnableCompression, boolean 
clientEnableCompression,
+                         int compressionLevel)
+      throws InterruptedException, IOException, EventDeliveryException {
     if (clientEnableCompression) {
       setUp("deflate", compressionLevel);
     } else {
@@ -732,15 +729,12 @@ public class TestAvroSink {
 
     source.start();
 
-    Assert
-        .assertTrue("Reached start or error", LifecycleController.waitForOneOf(
-            source, LifecycleState.START_OR_ERROR));
-    Assert.assertEquals("Server is started", LifecycleState.START,
-        source.getLifecycleState());
-
+    Assert.assertTrue("Reached start or error",
+                      LifecycleController.waitForOneOf(source, 
LifecycleState.START_OR_ERROR));
+    Assert.assertEquals("Server is started",
+                        LifecycleState.START, source.getLifecycleState());
 
-    Event event = EventBuilder.withBody("Hello avro",
-        Charset.forName("UTF8"));
+    Event event = EventBuilder.withBody("Hello avro", Charset.forName("UTF8"));
 
     sink.start();
 
@@ -858,7 +852,8 @@ public class TestAvroSink {
     public SSLChannelPipelineFactory() {
     }
 
-    public SSLChannelPipelineFactory(String keystore, String keystorePassword, 
String keystoreType) {
+    public SSLChannelPipelineFactory(String keystore, String keystorePassword,
+                                     String keystoreType) {
       this.keystore = keystore;
       this.keystorePassword = keystorePassword;
       this.keystoreType = keystoreType;

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
index 835f541..cf6cbbc 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestDefaultSinkFactory.java
@@ -37,7 +37,6 @@ public class TestDefaultSinkFactory {
   @Test
   public void testDuplicateCreate() {
 
-
     Sink avroSink1 = sinkFactory.create("avroSink1", "avro");
     Sink avroSink2 = sinkFactory.create("avroSink2", "avro");
 
@@ -55,7 +54,7 @@ public class TestDefaultSinkFactory {
   }
 
   private void verifySinkCreation(String name, String type, Class<?> typeClass)
-    throws Exception {
+      throws Exception {
     Sink sink = sinkFactory.create(name, type);
     Assert.assertNotNull(sink);
     Assert.assertTrue(typeClass.isInstance(sink));

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
index 3358cf4..8882056 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestFailoverSinkProcessor.java
@@ -94,7 +94,7 @@ public class TestFailoverSinkProcessor {
 
     @Override
     public Status process() throws EventDeliveryException {
-      synchronized(this) {
+      synchronized (this) {
         if (remaining <= 0) {
           throw new EventDeliveryException("can't consume more");
         }
@@ -107,7 +107,7 @@ public class TestFailoverSinkProcessor {
       tx.close();
 
       if (e != null) {
-        synchronized(this) {
+        synchronized (this) {
           remaining--;
         }
         written++;
@@ -167,7 +167,7 @@ public class TestFailoverSinkProcessor {
     Assert.assertEquals(LifecycleState.START, s1.getLifecycleState());
     Assert.assertEquals(LifecycleState.START, s2.getLifecycleState());
     Assert.assertEquals(LifecycleState.START, s3.getLifecycleState());
-    for(int i = 0; i < 15; i++) {
+    for (int i = 0; i < 15; i++) {
       Transaction tx = ch.getTransaction();
       tx.begin();
       ch.put(EventBuilder.withBody("test".getBytes()));
@@ -178,7 +178,7 @@ public class TestFailoverSinkProcessor {
 
     Assert.assertEquals(new Integer(10), s1.getWritten());
     Assert.assertEquals(new Integer(5), s2.getWritten());
-    for(int i = 0; i < 50; i++) {
+    for (int i = 0; i < 50; i++) {
       Transaction tx = ch.getTransaction();
       tx.begin();
       ch.put(EventBuilder.withBody("test".getBytes()));
@@ -195,7 +195,7 @@ public class TestFailoverSinkProcessor {
     // get us past the retry time for the failed sink
     Thread.sleep(5000);
 
-    for(int i = 0; i < 100; i++) {
+    for (int i = 0; i < 100; i++) {
       Transaction tx = ch.getTransaction();
       tx.begin();
       ch.put(EventBuilder.withBody("test".getBytes()));

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
index 7d95655..011d2d1 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/sink/TestLoadBalancingSinkProcessor.java
@@ -17,16 +17,7 @@
  */
 package org.apache.flume.sink;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import junit.framework.Assert;
-
 import org.apache.flume.Channel;
 import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
@@ -38,6 +29,14 @@ import org.apache.flume.Transaction;
 import org.apache.flume.channel.AbstractChannel;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 public class TestLoadBalancingSinkProcessor {
 
   private Context getContext(String selectorType, boolean backoff) {
@@ -62,8 +61,7 @@ public class TestLoadBalancingSinkProcessor {
     return getProcessor(sinks, getContext(selectorType, backoff));
   }
 
-  private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context 
ctx)
-  {
+  private LoadBalancingSinkProcessor getProcessor(List<Sink> sinks, Context 
ctx) {
     LoadBalancingSinkProcessor lbsp = new LoadBalancingSinkProcessor();
     lbsp.setSinks(sinks);
     lbsp.configure(ctx);
@@ -77,7 +75,7 @@ public class TestLoadBalancingSinkProcessor {
     // If no selector is specified, the round-robin selector should be used
     Channel ch = new MockChannel();
     int n = 100;
-    int numEvents = 3*n;
+    int numEvents = 3 * n;
     for (int i = 0; i < numEvents; i++) {
       ch.put(new MockEvent("test" + i));
     }
@@ -185,7 +183,7 @@ public class TestLoadBalancingSinkProcessor {
     // TODO: there is a remote possibility that s0 or s2
     // never get hit by the random assignment
     // and thus not backoffed, causing the test to fail
-    for(int i=0; i < 50; i++) {
+    for (int i = 0; i < 50; i++) {
       // a well behaved runner would always check the return.
       lbsp.process();
     }
@@ -214,7 +212,7 @@ public class TestLoadBalancingSinkProcessor {
   public void testRandomPersistentFailure() throws Exception {
     Channel ch = new MockChannel();
     int n = 100;
-    int numEvents = 3*n;
+    int numEvents = 3 * n;
     for (int i = 0; i < numEvents; i++) {
       ch.put(new MockEvent("test" + i));
     }
@@ -244,7 +242,7 @@ public class TestLoadBalancingSinkProcessor {
     }
 
     Assert.assertTrue(s2.getEvents().size() == 0);
-    Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3*n);
+    Assert.assertTrue(s1.getEvents().size() + s3.getEvents().size() == 3 * n);
   }
 
   @Test
@@ -325,8 +323,6 @@ public class TestLoadBalancingSinkProcessor {
     Assert.assertTrue("Miraculous distribution", sizeSet.size() > 1);
   }
 
-
-
   @Test
   public void testRoundRobinOneActiveSink() throws Exception {
     Channel ch = new MockChannel();
@@ -373,7 +369,7 @@ public class TestLoadBalancingSinkProcessor {
   public void testRoundRobinPersistentFailure() throws Exception {
     Channel ch = new MockChannel();
     int n = 100;
-    int numEvents = 3*n;
+    int numEvents = 3 * n;
     for (int i = 0; i < numEvents; i++) {
       ch.put(new MockEvent("test" + i));
     }
@@ -404,7 +400,7 @@ public class TestLoadBalancingSinkProcessor {
 
     Assert.assertTrue(s1.getEvents().size() == n);
     Assert.assertTrue(s2.getEvents().size() == 0);
-    Assert.assertTrue(s3.getEvents().size() == 2*n);
+    Assert.assertTrue(s3.getEvents().size() == 2 * n);
   }
 
   // test that even if the sink recovers immediately that it is kept out of 
commission briefly
@@ -413,7 +409,7 @@ public class TestLoadBalancingSinkProcessor {
   public void testRoundRobinBackoffInitialFailure() throws 
EventDeliveryException {
     Channel ch = new MockChannel();
     int n = 100;
-    int numEvents = 3*n;
+    int numEvents = 3 * n;
     for (int i = 0; i < numEvents; i++) {
       ch.put(new MockEvent("test" + i));
     }
@@ -424,7 +420,7 @@ public class TestLoadBalancingSinkProcessor {
     MockSink s2 = new MockSink(2);
     s2.setChannel(ch);
 
-      MockSink s3 = new MockSink(3);
+    MockSink s3 = new MockSink(3);
     s3.setChannel(ch);
 
     List<Sink> sinks = new ArrayList<Sink>();
@@ -449,14 +445,15 @@ public class TestLoadBalancingSinkProcessor {
 
     Assert.assertEquals((3 * n) / 2, s1.getEvents().size());
     Assert.assertEquals(1, s2.getEvents().size());
-    Assert.assertEquals((3 * n) /2 - 1, s3.getEvents().size());
+    Assert.assertEquals((3 * n) / 2 - 1, s3.getEvents().size());
   }
 
   @Test
-  public void testRoundRobinBackoffIncreasingBackoffs() throws 
EventDeliveryException, InterruptedException {
+  public void testRoundRobinBackoffIncreasingBackoffs()
+      throws EventDeliveryException, InterruptedException {
     Channel ch = new MockChannel();
     int n = 100;
-    int numEvents = 3*n;
+    int numEvents = 3 * n;
     for (int i = 0; i < numEvents; i++) {
       ch.put(new MockEvent("test" + i));
     }
@@ -468,7 +465,7 @@ public class TestLoadBalancingSinkProcessor {
     s2.setChannel(ch);
     s2.setFail(true);
 
-      MockSink s3 = new MockSink(3);
+    MockSink s3 = new MockSink(3);
     s3.setChannel(ch);
 
     List<Sink> sinks = new ArrayList<Sink>();
@@ -508,10 +505,11 @@ public class TestLoadBalancingSinkProcessor {
   }
 
   @Test
-  public void testRoundRobinBackoffFailureRecovery() throws 
EventDeliveryException, InterruptedException {
+  public void testRoundRobinBackoffFailureRecovery()
+      throws EventDeliveryException, InterruptedException {
     Channel ch = new MockChannel();
     int n = 100;
-    int numEvents = 3*n;
+    int numEvents = 3 * n;
     for (int i = 0; i < numEvents; i++) {
       ch.put(new MockEvent("test" + i));
     }
@@ -523,7 +521,7 @@ public class TestLoadBalancingSinkProcessor {
     s2.setChannel(ch);
     s2.setFail(true);
 
-      MockSink s3 = new MockSink(3);
+    MockSink s3 = new MockSink(3);
     s3.setChannel(ch);
 
     List<Sink> sinks = new ArrayList<Sink>();
@@ -548,13 +546,12 @@ public class TestLoadBalancingSinkProcessor {
     Assert.assertEquals(n, s3.getEvents().size());
   }
 
-
   @Test
   public void testRoundRobinNoFailure() throws Exception {
 
     Channel ch = new MockChannel();
     int n = 100;
-    int numEvents = 3*n;
+    int numEvents = 3 * n;
     for (int i = 0; i < numEvents; i++) {
       ch.put(new MockEvent("test" + i));
     }
@@ -656,8 +653,9 @@ public class TestLoadBalancingSinkProcessor {
         throw new EventDeliveryException("failed");
       }
       Event e = this.getChannel().take();
-      if (e == null)
+      if (e == null) {
         return Status.BACKOFF;
+      }
 
       events.add(e);
       return Status.READY;

Reply via email to