http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
index a58f344..69acc06 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java
@@ -18,25 +18,7 @@
  */
 package org.apache.flume.sink.elasticsearch;
 
-import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
-import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
-import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
-import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
-import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
-import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
-import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertNull;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.TimeZone;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang.time.FastDateFormat;
-
 import org.apache.flume.Channel;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
@@ -56,6 +38,24 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.BATCH_SIZE;
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLUSTER_NAME;
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.HOSTNAMES;
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_NAME;
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.INDEX_TYPE;
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER;
+import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 public class TestElasticSearchSink extends AbstractElasticSearchSinkTest {
 
   private ElasticSearchSink fixture;
@@ -373,23 +373,22 @@ public class TestElasticSearchSink extends 
AbstractElasticSearchSinkTest {
   public static final class CustomElasticSearchIndexRequestBuilderFactory
       extends AbstractElasticSearchIndexRequestBuilderFactory {
 
-    static String actualIndexName, actualIndexType;
+    static String actualIndexName;
+    static String actualIndexType;
     static byte[] actualEventBody;
     static boolean hasContext;
 
     public CustomElasticSearchIndexRequestBuilderFactory() {
-      super(FastDateFormat.getInstance("HH_mm_ss_SSS",
-          TimeZone.getTimeZone("EST5EDT")));
+      super(FastDateFormat.getInstance("HH_mm_ss_SSS", 
TimeZone.getTimeZone("EST5EDT")));
     }
 
     @Override
-    protected void prepareIndexRequest(IndexRequestBuilder indexRequest,
-        String indexName, String indexType, Event event) throws IOException {
+    protected void prepareIndexRequest(IndexRequestBuilder indexRequest, 
String indexName,
+                                       String indexType, Event event) throws 
IOException {
       actualIndexName = indexName;
       actualIndexType = indexType;
       actualEventBody = event.getBody();
-      indexRequest.setIndex(indexName).setType(indexType)
-          .setSource(event.getBody());
+      
indexRequest.setIndex(indexName).setType(indexType).setSource(event.getBody());
     }
 
     @Override
@@ -458,7 +457,8 @@ public class TestElasticSearchSink extends 
AbstractElasticSearchSinkTest {
 class FakeEventSerializer implements ElasticSearchEventSerializer {
 
   static final byte[] FAKE_BYTES = new byte[] { 9, 8, 7, 6 };
-  boolean configuredWithContext, configuredWithComponentConfiguration;
+  boolean configuredWithContext;
+  boolean configuredWithComponentConfiguration;
 
   @Override
   public BytesStream getContentBuilder(Event event) throws IOException {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java
index b5a4d2f..2a36439 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSinkCreation.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 public class TestElasticSearchSinkCreation {
 
-private SinkFactory sinkFactory;
+  private SinkFactory sinkFactory;
 
   @Before
   public void setUp() {

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
index 38e7399..0d1d092 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/RoundRobinListTest.java
@@ -17,10 +17,11 @@
 package org.apache.flume.sink.elasticsearch.client;
 
 import java.util.Arrays;
-import static org.junit.Assert.assertEquals;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+
 public class RoundRobinListTest {
 
   private RoundRobinList<String> fixture;

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
index 4b70b65..c3f07b0 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchClientFactory.java
@@ -21,10 +21,10 @@ package org.apache.flume.sink.elasticsearch.client;
 import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
 
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertThat;
-import org.mockito.Mock;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 public class TestElasticSearchClientFactory {
@@ -44,7 +44,7 @@ public class TestElasticSearchClientFactory {
   public void shouldReturnTransportClient() throws Exception {
     String[] hostNames = { "127.0.0.1" };
     Object o = factory.getClient(ElasticSearchClientFactory.TransportClient,
-            hostNames, "test", serializer, null);
+                                 hostNames, "test", serializer, null);
     assertThat(o, instanceOf(ElasticSearchTransportClient.class));
   }
 
@@ -52,13 +52,13 @@ public class TestElasticSearchClientFactory {
   public void shouldReturnRestClient() throws NoSuchClientTypeException {
     String[] hostNames = { "127.0.0.1" };
     Object o = factory.getClient(ElasticSearchClientFactory.RestClient,
-            hostNames, "test", serializer, null);
+                                 hostNames, "test", serializer, null);
     assertThat(o, instanceOf(ElasticSearchRestClient.class));
   }
 
-  @Test(expected=NoSuchClientTypeException.class)
+  @Test(expected = NoSuchClientTypeException.class)
   public void shouldThrowNoSuchClientTypeException() throws 
NoSuchClientTypeException {
-    String[] hostNames = {"127.0.0.1"};
+    String[] hostNames = { "127.0.0.1" };
     factory.getClient("not_existing_client", hostNames, "test", null, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
index 1fe983a..9551c81 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/client/TestElasticSearchRestClient.java
@@ -25,6 +25,15 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
 import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.HttpClient;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.io.BytesStream;
 import org.junit.Before;
@@ -38,16 +47,12 @@ import java.util.List;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.HttpStatus;
-import org.apache.http.StatusLine;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.client.methods.HttpUriRequest;
-import org.apache.http.util.EntityUtils;
-import org.elasticsearch.common.bytes.BytesArray;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.mockito.MockitoAnnotations.initMocks;
 
 public class TestElasticSearchRestClient {
@@ -126,15 +131,16 @@ public class TestElasticSearchRestClient {
     verify(httpClient).execute(argument.capture());
 
     assertEquals("http://host1/_bulk";, 
argument.getValue().getURI().toString());
-    
assertTrue(verifyJsonEvents("{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n",
-            MESSAGE_CONTENT, 
EntityUtils.toString(argument.getValue().getEntity())));
+    assertTrue(verifyJsonEvents(
+        
"{\"index\":{\"_type\":\"bar_type\",\"_index\":\"foo_index\",\"_ttl\":\"123\"}}\n",
+        MESSAGE_CONTENT, 
EntityUtils.toString(argument.getValue().getEntity())));
   }
 
   private boolean verifyJsonEvents(String expectedIndex, String expectedBody, 
String actual) {
     Iterator<String> it = Splitter.on("\n").split(actual).iterator();
     JsonParser parser = new JsonParser();
     JsonObject[] arr = new JsonObject[2];
-    for(int i = 0; i < 2; i++) {
+    for (int i = 0; i < 2; i++) {
       arr[i] = (JsonObject) parser.parse(it.next());
     }
     return arr[0].equals(parser.parse(expectedIndex)) && 
arr[1].equals(parser.parse(expectedBody));
@@ -156,7 +162,8 @@ public class TestElasticSearchRestClient {
   public void shouldRetryBulkOperation() throws Exception {
     ArgumentCaptor<HttpPost> argument = 
ArgumentCaptor.forClass(HttpPost.class);
 
-    
when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR,
 HttpStatus.SC_OK);
+    
when(httpStatus.getStatusCode()).thenReturn(HttpStatus.SC_INTERNAL_SERVER_ERROR,
+                                                HttpStatus.SC_OK);
     when(httpResponse.getStatusLine()).thenReturn(httpStatus);
     
when(httpClient.execute(any(HttpUriRequest.class))).thenReturn(httpResponse);
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
index b8aefe8..9a2be5a 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/IncrementAsyncHBaseSerializer.java
@@ -37,6 +37,7 @@ public class IncrementAsyncHBaseSerializer implements 
AsyncHbaseEventSerializer
   private byte[] cf;
   private byte[] column;
   private Event currentEvent;
+
   @Override
   public void initialize(byte[] table, byte[] cf) {
     this.table = table;
@@ -55,10 +56,9 @@ public class IncrementAsyncHBaseSerializer implements 
AsyncHbaseEventSerializer
 
   @Override
   public List<AtomicIncrementRequest> getIncrements() {
-    List<AtomicIncrementRequest> incrs
-      = new ArrayList<AtomicIncrementRequest>();
+    List<AtomicIncrementRequest> incrs = new 
ArrayList<AtomicIncrementRequest>();
     AtomicIncrementRequest incr = new AtomicIncrementRequest(table,
-      currentEvent.getBody(), cf, column, 1);
+        currentEvent.getBody(), cf, column, 1);
     incrs.add(incr);
     return incrs;
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
index b4bbd6b..f8faa1e 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestAsyncHBaseSink.java
@@ -176,7 +176,7 @@ public class TestAsyncHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -189,9 +189,9 @@ public class TestAsyncHBaseSink {
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 3; i++){
-      for(int j = 0; j < 3; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -209,8 +209,7 @@ public class TestAsyncHBaseSink {
   public void testTimeOut() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;
-    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
-      true, false);
+    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), 
true, false);
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, ctx);
@@ -219,7 +218,7 @@ public class TestAsyncHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -245,7 +244,7 @@ public class TestAsyncHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -253,7 +252,7 @@ public class TestAsyncHBaseSink {
     tx.close();
     int count = 0;
     Status status = Status.READY;
-    while(status != Status.BACKOFF){
+    while (status != Status.BACKOFF) {
       count++;
       status = sink.process();
     }
@@ -264,9 +263,9 @@ public class TestAsyncHBaseSink {
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 3; i++){
-      for(int j = 0; j < 3; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -278,26 +277,21 @@ public class TestAsyncHBaseSink {
   }
 
   @Test
-  public void testMultipleBatchesBatchIncrementsWithCoalescing()
-    throws Exception {
+  public void testMultipleBatchesBatchIncrementsWithCoalescing() throws 
Exception {
     doTestMultipleBatchesBatchIncrements(true);
   }
 
   @Test
-  public void testMultipleBatchesBatchIncrementsNoCoalescing()
-    throws Exception {
+  public void testMultipleBatchesBatchIncrementsNoCoalescing() throws 
Exception {
     doTestMultipleBatchesBatchIncrements(false);
   }
 
-  public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws
-    Exception {
+  public void doTestMultipleBatchesBatchIncrements(boolean coalesce) throws 
Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;
-    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
-      false, true);
+    AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(), 
false, true);
     if (coalesce) {
-      ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
-        "true");
+      ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS, 
"true");
     }
     ctx.put("batchSize", "2");
     ctx.put("serializer", IncrementAsyncHBaseSerializer.class.getName());
@@ -309,7 +303,7 @@ public class TestAsyncHBaseSink {
     ctx.put("serializer", SimpleAsyncHbaseEventSerializer.class.getName());
     //Restore the no coalescing behavior
     ctx.put(HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
-      "false");
+            "false");
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, ctx);
     sink.setChannel(channel);
@@ -335,7 +329,7 @@ public class TestAsyncHBaseSink {
     Assert.assertEquals(7, count);
     HTable table = new HTable(testUtility.getConfiguration(), tableName);
     Scan scan = new Scan();
-    scan.addColumn(columnFamily.getBytes(),"test".getBytes());
+    scan.addColumn(columnFamily.getBytes(), "test".getBytes());
     scan.setStartRow(Bytes.toBytes(valBase));
     ResultScanner rs = table.getScanner(scan);
     int i = 0;
@@ -358,19 +352,19 @@ public class TestAsyncHBaseSink {
   }
 
   @Test
-  public void testWithoutConfigurationObject() throws Exception{
+  public void testWithoutConfigurationObject() throws Exception {
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;
     ctx.put("batchSize", "2");
     ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
-            ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()) 
);
+            ZKConfig.getZKQuorumServersString(testUtility.getConfiguration()));
     ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-      testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+            
testUtility.getConfiguration().get(HConstants.ZOOKEEPER_ZNODE_PARENT));
     AsyncHBaseSink sink = new AsyncHBaseSink();
     Configurables.configure(sink, ctx);
     // Reset context to values usable by other tests.
     ctx.put(HBaseSinkConfigurationConstants.ZK_QUORUM, null);
-    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,null);
+    ctx.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT, null);
     ctx.put("batchSize", "100");
     Channel channel = new MemoryChannel();
     Configurables.configure(channel, ctx);
@@ -378,7 +372,7 @@ public class TestAsyncHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -386,7 +380,7 @@ public class TestAsyncHBaseSink {
     tx.close();
     int count = 0;
     Status status = Status.READY;
-    while(status != Status.BACKOFF){
+    while (status != Status.BACKOFF) {
       count++;
       status = sink.process();
     }
@@ -401,9 +395,9 @@ public class TestAsyncHBaseSink {
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 3; i++){
-      for(int j = 0; j < 3; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -428,7 +422,7 @@ public class TestAsyncHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -440,9 +434,9 @@ public class TestAsyncHBaseSink {
     byte[][] results = getResults(table, 2);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 2; i++){
-      for(int j = 0; j < 2; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -457,7 +451,7 @@ public class TestAsyncHBaseSink {
 
   // We only have support for getting File Descriptor count for Unix from the 
JDK
   private long getOpenFileDescriptorCount() {
-    if(os instanceof UnixOperatingSystemMXBean){
+    if (os instanceof UnixOperatingSystemMXBean) {
       return ((UnixOperatingSystemMXBean) os).getOpenFileDescriptorCount();
     } else {
       return -1;
@@ -476,13 +470,13 @@ public class TestAsyncHBaseSink {
    */
   @Test
   public void testFDLeakOnShutdown() throws Exception {
-    if(getOpenFileDescriptorCount() < 0) {
+    if (getOpenFileDescriptorCount() < 0) {
       return;
     }
     testUtility.createTable(tableName.getBytes(), columnFamily.getBytes());
     deleteTable = true;
     AsyncHBaseSink sink = new AsyncHBaseSink(testUtility.getConfiguration(),
-            true, false);
+                                             true, false);
     ctx.put("maxConsecutiveFails", "1");
     Configurables.configure(sink, ctx);
     Channel channel = new MemoryChannel();
@@ -492,7 +486,7 @@ public class TestAsyncHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -503,7 +497,7 @@ public class TestAsyncHBaseSink {
 
     // Since the isTimeOutTest is set to true, transaction will fail
     // with EventDeliveryException
-    for(int i = 0; i < 10; i ++) {
+    for (int i = 0; i < 10; i++) {
       try {
         sink.process();
       } catch (EventDeliveryException ex) {
@@ -511,18 +505,20 @@ public class TestAsyncHBaseSink {
     }
     long increaseInFD = getOpenFileDescriptorCount() - initialFDCount;
     Assert.assertTrue("File Descriptor leak detected. FDs have increased by " +
-      increaseInFD + " from an initial FD count of " + initialFDCount,  
increaseInFD < 50);
+                      increaseInFD + " from an initial FD count of " + 
initialFDCount,
+                      increaseInFD < 50);
   }
 
   /**
    * This test must run last - it shuts down the minicluster :D
+   *
    * @throws Exception
    */
   @Ignore("For dev builds only:" +
-      "This test takes too long, and this has to be run after all other" +
-      "tests, since it shuts down the minicluster. " +
-      "Comment out all other tests" +
-      "and uncomment this annotation to run this test.")
+          "This test takes too long, and this has to be run after all other" +
+          "tests, since it shuts down the minicluster. " +
+          "Comment out all other tests" +
+          "and uncomment this annotation to run this test.")
   @Test(expected = EventDeliveryException.class)
   public void testHBaseFailure() throws Exception {
     ctx.put("batchSize", "2");
@@ -538,7 +534,7 @@ public class TestAsyncHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -550,9 +546,9 @@ public class TestAsyncHBaseSink {
     byte[][] results = getResults(table, 2);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 2; i++){
-      for(int j = 0; j < 2; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -565,21 +561,23 @@ public class TestAsyncHBaseSink {
     sink.process();
     sink.stop();
   }
+
   /**
    * Makes Hbase scans to get rows in the payload column and increment column
    * in the table given. Expensive, so tread lightly.
    * Calling this function multiple times for the same result set is a bad
    * idea. Cache the result set once it is returned by this function.
+   *
    * @param table
    * @param numEvents Number of events inserted into the table
    * @return
    * @throws IOException
    */
-  private byte[][] getResults(HTable table, int numEvents) throws IOException{
-    byte[][] results = new byte[numEvents+1][];
+  private byte[][] getResults(HTable table, int numEvents) throws IOException {
+    byte[][] results = new byte[numEvents + 1][];
     Scan scan = new Scan();
-    scan.addColumn(columnFamily.getBytes(),plCol.getBytes());
-    scan.setStartRow( Bytes.toBytes("default"));
+    scan.addColumn(columnFamily.getBytes(), plCol.getBytes());
+    scan.setStartRow(Bytes.toBytes("default"));
     ResultScanner rs = table.getScanner(scan);
     byte[] out = null;
     int i = 0;
@@ -587,10 +585,10 @@ public class TestAsyncHBaseSink {
       for (Result r = rs.next(); r != null; r = rs.next()) {
         out = r.getValue(columnFamily.getBytes(), plCol.getBytes());
 
-        if(i >= results.length - 1){
+        if (i >= results.length - 1) {
           rs.close();
           throw new FlumeException("More results than expected in the table." +
-              "Expected = " + numEvents +". Found = " + i);
+                                   "Expected = " + numEvents + ". Found = " + 
i);
         }
         results[i++] = out;
         System.out.println(out);
@@ -601,7 +599,7 @@ public class TestAsyncHBaseSink {
 
     Assert.assertEquals(i, results.length - 1);
     scan = new Scan();
-    scan.addColumn(columnFamily.getBytes(),inColumn.getBytes());
+    scan.addColumn(columnFamily.getBytes(), inColumn.getBytes());
     scan.setStartRow(Bytes.toBytes("incRow"));
     rs = table.getScanner(scan);
     out = null;

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
index ab65a38..217913b 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestHBaseSink.java
@@ -18,14 +18,6 @@
  */
 package org.apache.flume.sink.hbase;
 
-import static org.mockito.Mockito.*;
-
-import java.io.IOException;
-import java.lang.reflect.Method;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
 import com.google.common.base.Charsets;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
@@ -54,14 +46,25 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
 public class TestHBaseSink {
   private static final Logger logger =
       LoggerFactory.getLogger(TestHBaseSink.class);
@@ -140,8 +143,7 @@ public class TestHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    Event e = EventBuilder.withBody(
-            Bytes.toBytes(valBase));
+    Event e = EventBuilder.withBody(Bytes.toBytes(valBase));
     channel.put(e);
     tx.commit();
     tx.close();
@@ -195,7 +197,7 @@ public class TestHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -207,9 +209,9 @@ public class TestHBaseSink {
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 3; i++){
-      for(int j = 0; j < 3; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -234,14 +236,14 @@ public class TestHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
     tx.commit();
     tx.close();
     int count = 0;
-    while(sink.process() != Status.BACKOFF){
+    while (sink.process() != Status.BACKOFF) {
       count++;
     }
     sink.stop();
@@ -250,9 +252,9 @@ public class TestHBaseSink {
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 3; i++){
-      for(int j = 0; j < 3; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -284,7 +286,7 @@ public class TestHBaseSink {
     logger.info("Writing data into channel");
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -313,9 +315,9 @@ public class TestHBaseSink {
     byte[][] results = getResults(table, 2);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 2; i++){
-      for(int j = 0; j < 2; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -328,15 +330,17 @@ public class TestHBaseSink {
   }
 
   // TODO: Move this test to a different class and run it stand-alone.
+
   /**
    * This test must run last - it shuts down the minicluster :D
+   *
    * @throws Exception
    */
   @Ignore("For dev builds only:" +
-      "This test takes too long, and this has to be run after all other" +
-      "tests, since it shuts down the minicluster. " +
-      "Comment out all other tests" +
-      "and uncomment this annotation to run this test.")
+          "This test takes too long, and this has to be run after all other" +
+          "tests, since it shuts down the minicluster. " +
+          "Comment out all other tests" +
+          "and uncomment this annotation to run this test.")
   @Test(expected = EventDeliveryException.class)
   public void testHBaseFailure() throws Exception {
     initContextForSimpleHbaseEventSerializer();
@@ -351,7 +355,7 @@ public class TestHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
@@ -362,9 +366,9 @@ public class TestHBaseSink {
     byte[][] results = getResults(table, 2);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 2; i++){
-      for(int j = 0; j < 2; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 2; i++) {
+      for (int j = 0; j < 2; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -378,22 +382,22 @@ public class TestHBaseSink {
     sink.stop();
   }
 
-
   /**
    * Makes Hbase scans to get rows in the payload column and increment column
    * in the table given. Expensive, so tread lightly.
    * Calling this function multiple times for the same result set is a bad
    * idea. Cache the result set once it is returned by this function.
+   *
    * @param table
    * @param numEvents Number of events inserted into the table
    * @return
    * @throws IOException
    */
-  private byte[][] getResults(HTable table, int numEvents) throws IOException{
-    byte[][] results = new byte[numEvents+1][];
+  private byte[][] getResults(HTable table, int numEvents) throws IOException {
+    byte[][] results = new byte[numEvents + 1][];
     Scan scan = new Scan();
-    scan.addColumn(columnFamily.getBytes(),plCol.getBytes());
-    scan.setStartRow( Bytes.toBytes("default"));
+    scan.addColumn(columnFamily.getBytes(), plCol.getBytes());
+    scan.setStartRow(Bytes.toBytes("default"));
     ResultScanner rs = table.getScanner(scan);
     byte[] out = null;
     int i = 0;
@@ -401,10 +405,10 @@ public class TestHBaseSink {
       for (Result r = rs.next(); r != null; r = rs.next()) {
         out = r.getValue(columnFamily.getBytes(), plCol.getBytes());
 
-        if(i >= results.length - 1){
+        if (i >= results.length - 1) {
           rs.close();
           throw new FlumeException("More results than expected in the table." +
-              "Expected = " + numEvents +". Found = " + i);
+                                   "Expected = " + numEvents + ". Found = " + 
i);
         }
         results[i++] = out;
         System.out.println(out);
@@ -415,7 +419,7 @@ public class TestHBaseSink {
 
     Assert.assertEquals(i, results.length - 1);
     scan = new Scan();
-    scan.addColumn(columnFamily.getBytes(),inColumn.getBytes());
+    scan.addColumn(columnFamily.getBytes(), inColumn.getBytes());
     scan.setStartRow(Bytes.toBytes("incRow"));
     rs = table.getScanner(scan);
     out = null;
@@ -472,7 +476,7 @@ public class TestHBaseSink {
     initContextForSimpleHbaseEventSerializer();
     ctx.put("batchSize", "1");
     ctx.put(HBaseSinkConfigurationConstants.CONFIG_SERIALIZER,
-        "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
+            "org.apache.flume.sink.hbase.MockSimpleHbaseEventSerializer");
 
     HBaseSink sink = new HBaseSink(conf);
     Configurables.configure(sink, ctx);
@@ -507,16 +511,16 @@ public class TestHBaseSink {
   }
 
   @Test
-  public void testWithoutConfigurationObject() throws Exception{
+  public void testWithoutConfigurationObject() throws Exception {
     initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
     tmpContext.put("batchSize", "2");
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM,
-      ZKConfig.getZKQuorumServersString(conf) );
+                   ZKConfig.getZKQuorumServersString(conf));
     System.out.print(ctx.getString(HBaseSinkConfigurationConstants.ZK_QUORUM));
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-      conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
-        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
+                   conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
 
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, tmpContext);
@@ -526,14 +530,14 @@ public class TestHBaseSink {
     sink.start();
     Transaction tx = channel.getTransaction();
     tx.begin();
-    for(int i = 0; i < 3; i++){
+    for (int i = 0; i < 3; i++) {
       Event e = EventBuilder.withBody(Bytes.toBytes(valBase + "-" + i));
       channel.put(e);
     }
     tx.commit();
     tx.close();
     Status status = Status.READY;
-    while(status != Status.BACKOFF){
+    while (status != Status.BACKOFF) {
       status = sink.process();
     }
     sink.stop();
@@ -541,9 +545,9 @@ public class TestHBaseSink {
     byte[][] results = getResults(table, 3);
     byte[] out;
     int found = 0;
-    for(int i = 0; i < 3; i++){
-      for(int j = 0; j < 3; j++){
-        if(Arrays.equals(results[j],Bytes.toBytes(valBase + "-" + i))){
+    for (int i = 0; i < 3; i++) {
+      for (int j = 0; j < 3; j++) {
+        if (Arrays.equals(results[j], Bytes.toBytes(valBase + "-" + i))) {
           found++;
           break;
         }
@@ -555,37 +559,36 @@ public class TestHBaseSink {
   }
 
   @Test
-  public void testZKQuorum() throws Exception{
+  public void testZKQuorum() throws Exception {
     initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
     String zkQuorum = "zk1.flume.apache.org:3342, zk2.flume.apache.org:3342, " 
+
-      "zk3.flume.apache.org:3342";
+                      "zk3.flume.apache.org:3342";
     tmpContext.put("batchSize", "2");
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-      conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
-        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
+                   conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, tmpContext);
     Assert.assertEquals("zk1.flume.apache.org,zk2.flume.apache.org," +
-      "zk3.flume.apache.org", sink.getConfig().get(HConstants
-      .ZOOKEEPER_QUORUM));
-    Assert.assertEquals(String.valueOf(3342), sink.getConfig().get(HConstants
-      .ZOOKEEPER_CLIENT_PORT));
+                        "zk3.flume.apache.org", 
sink.getConfig().get(HConstants.ZOOKEEPER_QUORUM));
+    Assert.assertEquals(String.valueOf(3342),
+                        
sink.getConfig().get(HConstants.ZOOKEEPER_CLIENT_PORT));
   }
 
-  @Test (expected = FlumeException.class)
-  public void testZKQuorumIncorrectPorts() throws Exception{
+  @Test(expected = FlumeException.class)
+  public void testZKQuorumIncorrectPorts() throws Exception {
     initContextForSimpleHbaseEventSerializer();
     Context tmpContext = new Context(ctx.getParameters());
 
     String zkQuorum = "zk1.flume.apache.org:3345, zk2.flume.apache.org:3342, " 
+
-      "zk3.flume.apache.org:3342";
+                      "zk3.flume.apache.org:3342";
     tmpContext.put("batchSize", "2");
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_QUORUM, zkQuorum);
     tmpContext.put(HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
-      conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
-        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
+                   conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+                            HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT));
     HBaseSink sink = new HBaseSink();
     Configurables.configure(sink, tmpContext);
     Assert.fail();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
index b102b49..24bcf37 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/test/java/org/apache/flume/sink/hbase/TestRegexHbaseEventSerializer.java
@@ -18,17 +18,7 @@
  */
 package org.apache.flume.sink.hbase;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.nio.charset.Charset;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.Maps;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
@@ -39,7 +29,16 @@ import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 
-import com.google.common.collect.Maps;
+import java.nio.charset.Charset;
+import java.util.Calendar;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestRegexHbaseEventSerializer {
 
@@ -76,8 +75,7 @@ public class TestRegexHbaseEventSerializer {
   public void testRowIndexKey() throws Exception {
     RegexHbaseEventSerializer s = new RegexHbaseEventSerializer();
     Context context = new Context();
-    context.put(RegexHbaseEventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" 
+
-      "([^\t]+)$");
+    context.put(RegexHbaseEventSerializer.REGEX_CONFIG,"^([^\t]+)\t([^\t]+)\t" 
+ "([^\t]+)$");
     context.put(RegexHbaseEventSerializer.COL_NAME_CONFIG, 
"col1,col2,ROW_KEY");
     context.put("rowKeyIndex", "2");
     s.configure(context);
@@ -115,9 +113,9 @@ public class TestRegexHbaseEventSerializer {
         "referer,agent");
     s.configure(context);
     String logMsg = "33.22.11.00 - - [20/May/2011:07:01:19 +0000] " +
-      "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " + 
-      "\"http://www.cloudera.com/wp-admin/install.php\"; \"Mozilla/5.0 (comp" +
-      "atible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)\"";
+        "\"GET /wp-admin/css/install.css HTTP/1.0\" 200 813 " +
+        "\"http://www.cloudera.com/wp-admin/install.php\"; \"Mozilla/5.0 (comp" 
+
+        "atible; Yahoo! Slurp; http://help.yahoo.com/help/us/ysearch/slurp)\"";
     
     Event e = EventBuilder.withBody(Bytes.toBytes(logMsg));
     s.initialize(e, "CF".getBytes());
@@ -189,7 +187,7 @@ public class TestRegexHbaseEventSerializer {
     
   }
 
-   @Test
+  @Test
   /** Test depositing of the header information. */
   public void testDepositHeaders() throws Exception {
     Charset charset = Charset.forName("KOI8-R");
@@ -222,7 +220,8 @@ public class TestRegexHbaseEventSerializer {
       resultMap.put(new String(kv.getQualifier(), charset), kv.getValue());
     }
 
-    assertEquals(body, new 
String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset));
+    assertEquals(body,
+                 new 
String(resultMap.get(RegexHbaseEventSerializer.COLUMN_NAME_DEFAULT), charset));
     assertEquals("value1", new String(resultMap.get("header1"), charset));
     assertArrayEquals("значение2".getBytes(charset), 
resultMap.get("заголовок2"));
     assertEquals("значение2".length(), 
resultMap.get("заголовок2").length);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
index f577e98..76eca37 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/TestKafkaSink.java
@@ -18,8 +18,8 @@
 
 package org.apache.flume.sink.kafka;
 
+import com.google.common.base.Charsets;
 import kafka.message.MessageAndMetadata;
-
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.specific.SpecificDatumReader;
@@ -41,8 +41,6 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
-
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -52,12 +50,21 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.AVRO_EVENT;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.BATCH_SIZE;
+import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.BROKER_LIST_FLUME_KEY;
+import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_KEY_SERIALIZER;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.DEFAULT_TOPIC;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PREFIX;
+import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.KAFKA_PRODUCER_PREFIX;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.OLD_BATCH_SIZE;
+import static 
org.apache.flume.sink.kafka.KafkaSinkConstants.REQUIRED_ACKS_FLUME_KEY;
+import static org.apache.flume.sink.kafka.KafkaSinkConstants.TOPIC_CONFIG;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
-import static org.apache.flume.sink.kafka.KafkaSinkConstants.*;
-
 /**
  * Unit tests for Kafka Sink
  */
@@ -86,40 +93,45 @@ public class TestKafkaSink {
     KafkaSink kafkaSink = new KafkaSink();
     Context context = new Context();
     context.put(KAFKA_PREFIX + TOPIC_CONFIG, "");
-    context.put(KAFKA_PRODUCER_PREFIX + 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "override.default.serializer");
+    context.put(KAFKA_PRODUCER_PREFIX + 
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                "override.default.serializer");
     context.put("kafka.producer.fake.property", "kafka.property.value");
     context.put("kafka.bootstrap.servers", "localhost:9092,localhost:9092");
-    context.put("brokerList","real-broker-list");
-    Configurables.configure(kafkaSink,context);
+    context.put("brokerList", "real-broker-list");
+    Configurables.configure(kafkaSink, context);
 
     Properties kafkaProps = kafkaSink.getKafkaProps();
 
     //check that we have defaults set
-    assertEquals(
-            
kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), 
DEFAULT_KEY_SERIALIZER);
+    
assertEquals(kafkaProps.getProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
+                 DEFAULT_KEY_SERIALIZER);
     //check that kafka properties override the default and get correct name
-    
assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
 "override.default.serializer");
+    
assertEquals(kafkaProps.getProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
+                 "override.default.serializer");
     //check that any kafka-producer property gets in
-    assertEquals(kafkaProps.getProperty("fake.property"), 
"kafka.property.value");
+    assertEquals(kafkaProps.getProperty("fake.property"),
+                 "kafka.property.value");
     //check that documented property overrides defaults
-    assertEquals(kafkaProps.getProperty("bootstrap.servers") 
,"localhost:9092,localhost:9092");
+    assertEquals(kafkaProps.getProperty("bootstrap.servers"),
+                 "localhost:9092,localhost:9092");
   }
 
   @Test
   public void testOldProperties() {
     KafkaSink kafkaSink = new KafkaSink();
     Context context = new Context();
-    context.put("topic","test-topic");
+    context.put("topic", "test-topic");
     context.put(OLD_BATCH_SIZE, "300");
-    context.put(BROKER_LIST_FLUME_KEY,"localhost:9092,localhost:9092");
+    context.put(BROKER_LIST_FLUME_KEY, "localhost:9092,localhost:9092");
     context.put(REQUIRED_ACKS_FLUME_KEY, "all");
-    Configurables.configure(kafkaSink,context);
+    Configurables.configure(kafkaSink, context);
 
     Properties kafkaProps = kafkaSink.getKafkaProps();
 
     assertEquals(kafkaSink.getTopic(), "test-topic");
-    assertEquals(kafkaSink.getBatchSize(),300);
-    
assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),"localhost:9092,localhost:9092");
+    assertEquals(kafkaSink.getBatchSize(), 300);
+    
assertEquals(kafkaProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                 "localhost:9092,localhost:9092");
     assertEquals(kafkaProps.getProperty(ProducerConfig.ACKS_CONFIG), "all");
 
   }
@@ -151,9 +163,8 @@ public class TestKafkaSink {
       // ignore
     }
 
-    String fetchedMsg = new String((byte[])
-      testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)
-        .message());
+    String fetchedMsg = new String((byte[]) 
testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC)
+                                                    .message());
     assertEquals(msg, fetchedMsg);
   }
 
@@ -173,14 +184,13 @@ public class TestKafkaSink {
       // ignore
     }
 
-    String fetchedMsg = new String((byte[]) 
testUtil.getNextMessageFromConsumer(TestConstants.STATIC_TOPIC).message());
+    String fetchedMsg = new String((byte[]) 
testUtil.getNextMessageFromConsumer(
+        TestConstants.STATIC_TOPIC).message());
     assertEquals(msg, fetchedMsg);
   }
 
   @Test
   public void testTopicAndKeyFromHeader() throws UnsupportedEncodingException {
-
-
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     Configurables.configure(kafkaSink, context);
@@ -210,19 +220,16 @@ public class TestKafkaSink {
     }
 
     MessageAndMetadata fetchedMsg =
-      testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+        testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
 
     assertEquals(msg, new String((byte[]) fetchedMsg.message(), "UTF-8"));
     assertEquals(TestConstants.CUSTOM_KEY,
-      new String((byte[]) fetchedMsg.key(), "UTF-8"));
-
+                 new String((byte[]) fetchedMsg.key(), "UTF-8"));
   }
 
   @SuppressWarnings("rawtypes")
   @Test
   public void testAvroEvent() throws IOException {
-
-
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     context.put(AVRO_EVENT, "true");
@@ -254,13 +261,12 @@ public class TestKafkaSink {
       // ignore
     }
 
-    MessageAndMetadata fetchedMsg =
-      testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
+    MessageAndMetadata fetchedMsg = 
testUtil.getNextMessageFromConsumer(TestConstants.CUSTOM_TOPIC);
 
-    ByteArrayInputStream in =
-        new ByteArrayInputStream((byte[])fetchedMsg.message());
+    ByteArrayInputStream in = new ByteArrayInputStream((byte[]) 
fetchedMsg.message());
     BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);
-    SpecificDatumReader<AvroFlumeEvent> reader = new 
SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class);
+    SpecificDatumReader<AvroFlumeEvent> reader =
+        new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class);
 
     AvroFlumeEvent avroevent = reader.read(null, decoder);
 
@@ -268,17 +274,15 @@ public class TestKafkaSink {
     Map<CharSequence, CharSequence> eventHeaders = avroevent.getHeaders();
 
     assertEquals(msg, eventBody);
-    assertEquals(TestConstants.CUSTOM_KEY,
-      new String((byte[]) fetchedMsg.key(), "UTF-8"));
+    assertEquals(TestConstants.CUSTOM_KEY, new String((byte[]) 
fetchedMsg.key(), "UTF-8"));
 
-    assertEquals(TestConstants.HEADER_1_VALUE, eventHeaders.get(new 
Utf8(TestConstants.HEADER_1_KEY)).toString());
+    assertEquals(TestConstants.HEADER_1_VALUE,
+                 eventHeaders.get(new 
Utf8(TestConstants.HEADER_1_KEY)).toString());
     assertEquals(TestConstants.CUSTOM_KEY, eventHeaders.get(new 
Utf8("key")).toString());
-
   }
 
   @Test
-  public void testEmptyChannel() throws UnsupportedEncodingException,
-          EventDeliveryException {
+  public void testEmptyChannel() throws UnsupportedEncodingException, 
EventDeliveryException {
     Sink kafkaSink = new KafkaSink();
     Context context = prepareDefaultContext();
     Configurables.configure(kafkaSink, context);
@@ -291,8 +295,7 @@ public class TestKafkaSink {
     if (status != Sink.Status.BACKOFF) {
       fail("Error Occurred");
     }
-    assertNull(
-      testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC));
+    assertNull(testUtil.getNextMessageFromConsumer(DEFAULT_TOPIC));
   }
 
   private Context prepareDefaultContext() {
@@ -304,7 +307,7 @@ public class TestKafkaSink {
   }
 
   private Sink.Status prepareAndSend(Context context, String msg)
-    throws EventDeliveryException {
+      throws EventDeliveryException {
     Sink kafkaSink = new KafkaSink();
     Configurables.configure(kafkaSink, context);
     Channel memoryChannel = new MemoryChannel();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
index d8a45ef..6d89bd3 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/KafkaLocal.java
@@ -1,19 +1,19 @@
 /**
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flume.sink.kafka.util;
@@ -30,23 +30,22 @@ import java.util.Properties;
  */
 public class KafkaLocal {
 
-    public KafkaServerStartable kafka;
-    public ZooKeeperLocal zookeeper;
+  public KafkaServerStartable kafka;
+  public ZooKeeperLocal zookeeper;
 
-    public KafkaLocal(Properties kafkaProperties) throws IOException,
-        InterruptedException{
-        KafkaConfig kafkaConfig = KafkaConfig.fromProps(kafkaProperties);
+  public KafkaLocal(Properties kafkaProperties) throws IOException, 
InterruptedException {
+    KafkaConfig kafkaConfig = KafkaConfig.fromProps(kafkaProperties);
 
-        //start local kafka broker
-        kafka = new KafkaServerStartable(kafkaConfig);
-    }
+    // start local kafka broker
+    kafka = new KafkaServerStartable(kafkaConfig);
+  }
 
-    public void start() throws Exception{
-        kafka.startup();
-    }
+  public void start() throws Exception {
+    kafka.startup();
+  }
 
-    public void stop(){
-        kafka.shutdown();
-    }
+  public void stop() {
+    kafka.shutdown();
+  }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
index 1a5728f..35c1e47 100644
--- 
a/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
+++ 
b/flume-ng-sinks/flume-ng-kafka-sink/src/test/java/org/apache/flume/sink/kafka/util/ZooKeeperLocal.java
@@ -1,19 +1,19 @@
 /**
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * limitations under the License.
  */
 
 package org.apache.flume.sink.kafka.util;
@@ -33,30 +33,29 @@ import java.util.Properties;
  */
 public class ZooKeeperLocal {
 
-  private static final Logger logger =
-    LoggerFactory.getLogger(ZooKeeperLocal.class);
-    private ZooKeeperServerMain zooKeeperServer;
+  private static final Logger logger = 
LoggerFactory.getLogger(ZooKeeperLocal.class);
+  private ZooKeeperServerMain zooKeeperServer;
 
-    public ZooKeeperLocal(Properties zkProperties) throws IOException{
-        QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+  public ZooKeeperLocal(Properties zkProperties) throws IOException {
+    QuorumPeerConfig quorumConfiguration = new QuorumPeerConfig();
+    try {
+      quorumConfiguration.parseProperties(zkProperties);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    zooKeeperServer = new ZooKeeperServerMain();
+    final ServerConfig configuration = new ServerConfig();
+    configuration.readFrom(quorumConfiguration);
+
+    new Thread() {
+      public void run() {
         try {
-            quorumConfiguration.parseProperties(zkProperties);
-        } catch(Exception e) {
-            throw new RuntimeException(e);
+          zooKeeperServer.runFromConfig(configuration);
+        } catch (IOException e) {
+          logger.error("Zookeeper startup failed.", e);
         }
-
-        zooKeeperServer = new ZooKeeperServerMain();
-        final ServerConfig configuration = new ServerConfig();
-        configuration.readFrom(quorumConfiguration);
-
-        new Thread() {
-            public void run() {
-                try {
-                    zooKeeperServer.runFromConfig(configuration);
-                } catch (IOException e) {
-                    logger.error("Zookeeper startup failed.", e);
-                }
-            }
-        }.start();
-    }
+      }
+    }.start();
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
index 6172c68..be377ba 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestBlobDeserializer.java
@@ -16,9 +16,7 @@
  */
 package org.apache.flume.sink.solr.morphline;
 
-import java.io.IOException;
-import java.util.List;
-
+import com.google.common.base.Charsets;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.serialization.EventDeserializer;
@@ -28,7 +26,8 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.base.Charsets;
+import java.io.IOException;
+import java.util.List;
 
 public class TestBlobDeserializer extends Assert {
 
@@ -61,7 +60,8 @@ public class TestBlobDeserializer extends Assert {
   public void testSimpleViaFactory() throws IOException {
     ResettableInputStream in = new ResettableTestStringInputStream(mini);
     EventDeserializer des;
-    des = 
EventDeserializerFactory.getInstance(BlobDeserializer.Builder.class.getName(), 
new Context(), in);
+    des = 
EventDeserializerFactory.getInstance(BlobDeserializer.Builder.class.getName(),
+                                               new Context(), in);
     validateMiniParse(des);
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java
index 22cfe96..8d62d38 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineInterceptor.java
@@ -16,22 +16,21 @@
  */
 package org.apache.flume.sink.solr.morphline;
 
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.event.EventBuilder;
 import org.junit.Assert;
 import org.junit.Test;
-
 import org.kitesdk.morphline.base.Fields;
-import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.io.Files;
+
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 public class TestMorphlineInterceptor extends Assert {
 
@@ -40,13 +39,15 @@ public class TestMorphlineInterceptor extends Assert {
   @Test
   public void testNoOperation() throws Exception {
     Context context = new Context();
-    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + 
"/test-morphlines/noOperation.conf");
+    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM,
+                RESOURCES_DIR + "/test-morphlines/noOperation.conf");
     Event input = EventBuilder.withBody("foo", Charsets.UTF_8);
     input.getHeaders().put("name", "nadja");
     MorphlineInterceptor interceptor = build(context);
     Event actual = interceptor.intercept(input);
     interceptor.close();
-    Event expected = EventBuilder.withBody("foo".getBytes("UTF-8"), 
ImmutableMap.of("name", "nadja"));
+    Event expected = EventBuilder.withBody("foo".getBytes("UTF-8"),
+                                           ImmutableMap.of("name", "nadja"));
     assertEqualsEvent(expected, actual);
     
     List<Event> actualList = 
build(context).intercept(Collections.singletonList(input));
@@ -57,11 +58,13 @@ public class TestMorphlineInterceptor extends Assert {
   @Test
   public void testReadClob() throws Exception {
     Context context = new Context();
-    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + 
"/test-morphlines/readClob.conf");
+    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM,
+                RESOURCES_DIR + "/test-morphlines/readClob.conf");
     Event input = EventBuilder.withBody("foo", Charsets.UTF_8);
     input.getHeaders().put("name", "nadja");
     Event actual = build(context).intercept(input);
-    Event expected = EventBuilder.withBody(null, ImmutableMap.of("name", 
"nadja", Fields.MESSAGE, "foo"));
+    Event expected = EventBuilder.withBody(null,
+                                           ImmutableMap.of("name", "nadja", 
Fields.MESSAGE, "foo"));
     assertEqualsEvent(expected, actual);
 
     List<Event> actualList = 
build(context).intercept(Collections.singletonList(input));
@@ -72,7 +75,8 @@ public class TestMorphlineInterceptor extends Assert {
   @Test
   public void testGrokIfNotMatchDropEventRetain() throws Exception {
     Context context = new Context();
-    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + 
"/test-morphlines/grokIfNotMatchDropRecord.conf");
+    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM,
+                RESOURCES_DIR + 
"/test-morphlines/grokIfNotMatchDropRecord.conf");
 
     String msg = "<164>Feb  4 10:46:14 syslog sshd[607]: Server listening on 
0.0.0.0 port 22.";
     Event input = EventBuilder.withBody(null, ImmutableMap.of(Fields.MESSAGE, 
msg));
@@ -94,8 +98,10 @@ public class TestMorphlineInterceptor extends Assert {
   /* leading XXXXX does not match regex, thus we expect the event to be 
dropped */
   public void testGrokIfNotMatchDropEventDrop() throws Exception {
     Context context = new Context();
-    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + 
"/test-morphlines/grokIfNotMatchDropRecord.conf");
-    String msg = "<XXXXXXXXXXXXX164>Feb  4 10:46:14 syslog sshd[607]: Server 
listening on 0.0.0.0 port 22.";
+    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM,
+                RESOURCES_DIR + 
"/test-morphlines/grokIfNotMatchDropRecord.conf");
+    String msg = "<XXXXXXXXXXXXX164>Feb  4 10:46:14 syslog sshd[607]: Server 
listening on 0.0.0.0" +
+                     " port 22.";
     Event input = EventBuilder.withBody(null, ImmutableMap.of(Fields.MESSAGE, 
msg));
     Event actual = build(context).intercept(input);
     assertNull(actual);
@@ -105,10 +111,12 @@ public class TestMorphlineInterceptor extends Assert {
   /** morphline says route to southpole if it's an avro file, otherwise route 
to northpole */
   public void testIfDetectMimeTypeRouteToSouthPole() throws Exception {
     Context context = new Context();
-    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + 
"/test-morphlines/ifDetectMimeType.conf");
+    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM,
+                RESOURCES_DIR + "/test-morphlines/ifDetectMimeType.conf");
     context.put(MorphlineHandlerImpl.MORPHLINE_VARIABLE_PARAM + 
".MY.MIME_TYPE", "avro/binary");
 
-    Event input = EventBuilder.withBody(Files.toByteArray(new 
File(RESOURCES_DIR + "/test-documents/sample-statuses-20120906-141433.avro")));
+    Event input = EventBuilder.withBody(Files.toByteArray(
+        new File(RESOURCES_DIR + 
"/test-documents/sample-statuses-20120906-141433.avro")));
     Event actual = build(context).intercept(input);
 
     Map<String, String> expected = new HashMap();
@@ -122,10 +130,12 @@ public class TestMorphlineInterceptor extends Assert {
   /** morphline says route to southpole if it's an avro file, otherwise route 
to northpole */
   public void testIfDetectMimeTypeRouteToNorthPole() throws Exception {
     Context context = new Context();
-    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM, RESOURCES_DIR + 
"/test-morphlines/ifDetectMimeType.conf");
+    context.put(MorphlineHandlerImpl.MORPHLINE_FILE_PARAM,
+                RESOURCES_DIR + "/test-morphlines/ifDetectMimeType.conf");
     context.put(MorphlineHandlerImpl.MORPHLINE_VARIABLE_PARAM + 
".MY.MIME_TYPE", "avro/binary");
 
-    Event input = EventBuilder.withBody(Files.toByteArray(new 
File(RESOURCES_DIR + "/test-documents/testPDF.pdf")));
+    Event input = EventBuilder.withBody(
+        Files.toByteArray(new File(RESOURCES_DIR + 
"/test-documents/testPDF.pdf")));
     Event actual = build(context).intercept(input);
 
     Map<String, String> expected = new HashMap();
@@ -140,8 +150,9 @@ public class TestMorphlineInterceptor extends Assert {
     builder.configure(context);
     return builder.build();
   }
-  
-  private void assertEqualsEvent(Event x, Event y) { // b/c SimpleEvent 
doesn't implement equals() method :-(
+
+  // b/c SimpleEvent doesn't implement equals() method :-(
+  private void assertEqualsEvent(Event x, Event y) {
     assertEquals(x.getHeaders(), y.getHeaders());
     assertArrayEquals(x.getBody(), y.getBody());
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
index 232c092..1bfae95 100644
--- 
a/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
+++ 
b/flume-ng-sinks/flume-ng-morphline-solr-sink/src/test/java/org/apache/flume/sink/solr/morphline/TestMorphlineSolrSink.java
@@ -87,8 +87,7 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 {
     initCore(
         RESOURCES_DIR + "/solr/collection1/conf/solrconfig.xml", 
         RESOURCES_DIR + "/solr/collection1/conf/schema.xml",
-        RESOURCES_DIR + "/solr"
-        );
+        RESOURCES_DIR + "/solr");
   }
 
   @Before
@@ -139,9 +138,9 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 {
     int batchSize = SEQ_NUM2.incrementAndGet() % 2 == 0 ? 100 : 1;
     DocumentLoader testServer = new SolrServerDocumentLoader(solrServer, 
batchSize);
     MorphlineContext solrMorphlineContext = new SolrMorphlineContext.Builder()
-      .setDocumentLoader(testServer)
-      .setExceptionHandler(new FaultTolerance(false, false, 
SolrServerException.class.getName()))
-      .setMetricRegistry(new MetricRegistry()).build();
+        .setDocumentLoader(testServer)
+        .setExceptionHandler(new FaultTolerance(false, false, 
SolrServerException.class.getName()))
+        .setMetricRegistry(new MetricRegistry()).build();
     
     MorphlineHandlerImpl impl = new MorphlineHandlerImpl();
     impl.setMorphlineContext(solrMorphlineContext);
@@ -302,9 +301,11 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 {
     QueryResponse rsp = query("*:*");
     Iterator<SolrDocument> iter = rsp.getResults().iterator();
     ListMultimap<String, String> expectedFieldValues;
-    expectedFieldValues = ImmutableListMultimap.of("id", "1234567890", "text", 
"sample tweet one", "user_screen_name", "fake_user1");
+    expectedFieldValues = ImmutableListMultimap.of("id", "1234567890", "text", 
"sample tweet one",
+                                                   "user_screen_name", 
"fake_user1");
     assertEquals(expectedFieldValues, next(iter));
-    expectedFieldValues = ImmutableListMultimap.of("id", "2345678901", "text", 
"sample tweet two", "user_screen_name", "fake_user2");  
+    expectedFieldValues = ImmutableListMultimap.of("id", "2345678901", "text", 
"sample tweet two",
+                                                   "user_screen_name", 
"fake_user2");
     assertEquals(expectedFieldValues, next(iter));
     assertFalse(iter.hasNext());
   }
@@ -398,8 +399,8 @@ public class TestMorphlineSolrSink extends SolrTestCaseJ4 {
     
     float secs = (System.currentTimeMillis() - startTime) / 1000.0f;
     long numDocs = queryResultSetSize("*:*");
-    LOGGER.info("Took secs: " + secs + ", iters/sec: " + (iters/secs));
-    LOGGER.info("Took secs: " + secs + ", docs/sec: " + (numDocs/secs));
+    LOGGER.info("Took secs: " + secs + ", iters/sec: " + (iters / secs));
+    LOGGER.info("Took secs: " + secs + ", docs/sec: " + (numDocs / secs));
     LOGGER.info("Iterations: " + iters + ", numDocs: " + numDocs);
     LOGGER.info("sink: ", sink);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
index 6881967..b8466f7 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/JMSMessageConsumerTestBase.java
@@ -92,13 +92,10 @@ public abstract class JMSMessageConsumerTestBase {
       }
     });
     when(message.getText()).thenReturn(TEXT);
-    when(connectionFactory.createConnection(USERNAME, PASSWORD)).
-      thenReturn(connection);
-    when(connection.createSession(true, Session.SESSION_TRANSACTED)).
-      thenReturn(session);
+    when(connectionFactory.createConnection(USERNAME, 
PASSWORD)).thenReturn(connection);
+    when(connection.createSession(true, 
Session.SESSION_TRANSACTED)).thenReturn(session);
     when(session.createQueue(destinationName)).thenReturn(queue);
-    when(session.createConsumer(any(Destination.class), anyString()))
-      .thenReturn(messageConsumer);
+    when(session.createConsumer(any(Destination.class), 
anyString())).thenReturn(messageConsumer);
     when(messageConsumer.receiveNoWait()).thenReturn(message);
     when(messageConsumer.receive(anyLong())).thenReturn(message);
     destinationName = DESTINATION_NAME;
@@ -127,7 +124,7 @@ public abstract class JMSMessageConsumerTestBase {
 
   }
   void assertBodyIsExpected(List<Event> events) {
-    for(Event event : events) {
+    for (Event event : events) {
       assertEquals(TEXT, new String(event.getBody(), Charsets.UTF_8));
     }
   }
@@ -140,7 +137,7 @@ public abstract class JMSMessageConsumerTestBase {
   @After
   public void tearDown() throws Exception {
     beforeTearDown();
-    if(consumer != null) {
+    if (consumer != null) {
       consumer.close();
     }
     afterTearDown();

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java
index 8d413f7..0b2193c 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestDefaultJMSMessageConverter.java
@@ -73,7 +73,7 @@ public class TestDefaultJMSMessageConverter {
       @Override
       public Integer answer(InvocationOnMock invocation) throws Throwable {
         byte[] buffer = (byte[])invocation.getArguments()[0];
-        if(buffer != null) {
+        if (buffer != null) {
           assertEquals(buffer.length, BYTES.length);
           System.arraycopy(BYTES, 0, buffer, 0, BYTES.length);
         }

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
index e28e02a..53cc09a 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestIntegrationActiveMQ.java
@@ -56,9 +56,10 @@ import com.google.common.io.Files;
 
 public class TestIntegrationActiveMQ {
 
-  private final static String INITIAL_CONTEXT_FACTORY = 
"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
+  private static final String INITIAL_CONTEXT_FACTORY =
+      "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
   public static final String BROKER_BIND_URL = "tcp://localhost:61516";
-  private final static  String DESTINATION_NAME = "test";
+  private static final String DESTINATION_NAME = "test";
   private static final String USERNAME = "user";
   private static final String PASSWORD = "pass";
   // specific for dynamic queues on ActiveMq
@@ -115,15 +116,14 @@ public class TestIntegrationActiveMQ {
       }
     }).when(channelProcessor).processEventBatch(any(List.class));
     source.setChannelProcessor(channelProcessor);
-
-
   }
+
   @After
   public void tearDown() throws Exception {
-    if(source != null) {
+    if (source != null) {
       source.stop();
     }
-    if(broker != null) {
+    if (broker != null) {
       broker.stop();
     }
     FileUtils.deleteDirectory(baseDir);
@@ -140,8 +140,7 @@ public class TestIntegrationActiveMQ {
     Destination destination = session.createQueue(DESTINATION_NAME);
     MessageProducer producer = session.createProducer(destination);
 
-
-    for(String event : events) {
+    for (String event : events) {
       TextMessage message = session.createTextMessage();
       message.setText(event);
       producer.send(message);
@@ -162,8 +161,7 @@ public class TestIntegrationActiveMQ {
     Destination destination = session.createTopic(DESTINATION_NAME);
     MessageProducer producer = session.createProducer(destination);
 
-
-    for(String event : events) {
+    for (String event : events) {
       TextMessage message = session.createTextMessage();
       message.setText(event);
       producer.send(message);
@@ -202,13 +200,14 @@ public class TestIntegrationActiveMQ {
     Assert.assertEquals(Status.BACKOFF, source.process());
     Assert.assertEquals(expected.size(), events.size());
     List<String> actual = Lists.newArrayList();
-    for(Event event : events) {
+    for (Event event : events) {
       actual.add(new String(event.getBody(), Charsets.UTF_8));
     }
     Collections.sort(expected);
     Collections.sort(actual);
     Assert.assertEquals(expected, actual);
   }
+
   @Test
   public void testTopic() throws Exception {
     context.put(JMSSourceConfiguration.DESTINATION_TYPE,
@@ -229,7 +228,7 @@ public class TestIntegrationActiveMQ {
     Assert.assertEquals(Status.BACKOFF, source.process());
     Assert.assertEquals(expected.size(), events.size());
     List<String> actual = Lists.newArrayList();
-    for(Event event : events) {
+    for (Event event : events) {
       actual.add(new String(event.getBody(), Charsets.UTF_8));
     }
     Collections.sort(expected);

http://git-wip-us.apache.org/repos/asf/flume/blob/cfbf1156/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
index 9bace82..dcb47d9 100644
--- 
a/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
+++ 
b/flume-ng-sources/flume-jms-source/src/test/java/org/apache/flume/source/jms/TestJMSMessageConsumer.java
@@ -105,8 +105,7 @@ public class TestJMSMessageConsumer extends 
JMSMessageConsumerTestBase {
   @Test
   public void testNoUserPass() throws Exception {
     userName = Optional.absent();
-    when(connectionFactory.createConnection(USERNAME, PASSWORD)).
-      thenThrow(new AssertionError());
+    when(connectionFactory.createConnection(USERNAME, PASSWORD)).thenThrow(new 
AssertionError());
     when(connectionFactory.createConnection()).thenReturn(connection);
     consumer = create();
     List<Event> events = consumer.take();

Reply via email to