http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
index 9a48841..741f01e 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java
@@ -63,8 +63,7 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
     public void run(String filePath);
   }
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(HDFSEventSink.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(HDFSEventSink.class);
 
   private static String DIRECTORY_DELIMITER = 
System.getProperty("file.separator");
 
@@ -98,7 +97,6 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
   private static final int defaultThreadPoolSize = 10;
   private static final int defaultRollTimerPoolSize = 1;
 
-
   private final HDFSWriterFactory writerFactory;
   private WriterLinkedHashMap sfWriters;
 
@@ -217,23 +215,21 @@ public class HDFSEventSink extends AbstractSink 
implements Configurable {
     String kerbKeytab = context.getString("hdfs.kerberosKeytab");
     String proxyUser = context.getString("hdfs.proxyUser");
     tryCount = context.getInteger("hdfs.closeTries", defaultTryCount);
-    if(tryCount <= 0) {
+    if (tryCount <= 0) {
       LOG.warn("Retry count value : " + tryCount + " is not " +
-        "valid. The sink will try to close the file until the file " +
-        "is eventually closed.");
+          "valid. The sink will try to close the file until the file " +
+          "is eventually closed.");
       tryCount = defaultTryCount;
     }
-    retryInterval = context.getLong("hdfs.retryInterval",
-      defaultRetryInterval);
-    if(retryInterval <= 0) {
+    retryInterval = context.getLong("hdfs.retryInterval", 
defaultRetryInterval);
+    if (retryInterval <= 0) {
       LOG.warn("Retry Interval value: " + retryInterval + " is not " +
-        "valid. If the first close of a file fails, " +
-        "it may remain open and will not be renamed.");
+          "valid. If the first close of a file fails, " +
+          "it may remain open and will not be renamed.");
       tryCount = 1;
     }
 
-    Preconditions.checkArgument(batchSize > 0,
-        "batchSize must be greater than 0");
+    Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 
0");
     if (codecName == null) {
       codeC = null;
       compType = CompressionType.NONE;
@@ -245,14 +241,13 @@ public class HDFSEventSink extends AbstractSink 
implements Configurable {
 
     // Do not allow user to set fileType DataStream with codeC together
     // To prevent output file with compress extension (like .snappy)
-    if(fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType)
-        && codecName != null) {
+    if (fileType.equalsIgnoreCase(HDFSWriterFactory.DataStreamType) && 
codecName != null) {
       throw new IllegalArgumentException("fileType: " + fileType +
           " which does NOT support compressed output. Please don't set codeC" +
           " or change the fileType if compressed output is desired.");
     }
 
-    if(fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
+    if (fileType.equalsIgnoreCase(HDFSWriterFactory.CompStreamType)) {
       Preconditions.checkNotNull(codeC, "It's essential to set compress codec"
           + " when fileType is: " + fileType);
     }
@@ -261,18 +256,15 @@ public class HDFSEventSink extends AbstractSink 
implements Configurable {
     this.privExecutor = FlumeAuthenticationUtil.getAuthenticator(
             kerbConfPrincipal, kerbKeytab).proxyAs(proxyUser);
 
-
-
-
     needRounding = context.getBoolean("hdfs.round", false);
 
-    if(needRounding) {
+    if (needRounding) {
       String unit = context.getString("hdfs.roundUnit", "second");
       if (unit.equalsIgnoreCase("hour")) {
         this.roundUnit = Calendar.HOUR_OF_DAY;
       } else if (unit.equalsIgnoreCase("minute")) {
         this.roundUnit = Calendar.MINUTE;
-      } else if (unit.equalsIgnoreCase("second")){
+      } else if (unit.equalsIgnoreCase("second")) {
         this.roundUnit = Calendar.SECOND;
       } else {
         LOG.warn("Rounding unit is not valid, please set one of" +
@@ -280,11 +272,11 @@ public class HDFSEventSink extends AbstractSink 
implements Configurable {
         needRounding = false;
       }
       this.roundValue = context.getInteger("hdfs.roundValue", 1);
-      if(roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
+      if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
         Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
             "Round value" +
             "must be > 0 and <= 60");
-      } else if (roundUnit == Calendar.HOUR_OF_DAY){
+      } else if (roundUnit == Calendar.HOUR_OF_DAY) {
         Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
             "Round value" +
             "must be > 0 and <= 24");
@@ -292,7 +284,7 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
     }
 
     this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false);
-    if(useLocalTime) {
+    if (useLocalTime) {
       clock = new SystemClock();
     }
 
@@ -301,16 +293,13 @@ public class HDFSEventSink extends AbstractSink 
implements Configurable {
     }
   }
 
-  private static boolean codecMatches(Class<? extends CompressionCodec> cls,
-      String codecName) {
+  private static boolean codecMatches(Class<? extends CompressionCodec> cls, 
String codecName) {
     String simpleName = cls.getSimpleName();
-    if (cls.getName().equals(codecName)
-        || simpleName.equalsIgnoreCase(codecName)) {
+    if (cls.getName().equals(codecName) || 
simpleName.equalsIgnoreCase(codecName)) {
       return true;
     }
     if (simpleName.endsWith("Codec")) {
-      String prefix = simpleName.substring(0,
-          simpleName.length() - "Codec".length());
+      String prefix = simpleName.substring(0, simpleName.length() - 
"Codec".length());
       if (prefix.equalsIgnoreCase(codecName)) {
         return true;
       }
@@ -321,8 +310,7 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
   @VisibleForTesting
   static CompressionCodec getCodec(String codecName) {
     Configuration conf = new Configuration();
-    List<Class<? extends CompressionCodec>> codecs = CompressionCodecFactory
-        .getCodecClasses(conf);
+    List<Class<? extends CompressionCodec>> codecs = 
CompressionCodecFactory.getCodecClasses(conf);
     // Wish we could base this on DefaultCodec but appears not all codec's
     // extend DefaultCodec(Lzo)
     CompressionCodec codec = null;
@@ -380,7 +368,7 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
             timeZone, needRounding, roundUnit, roundValue, useLocalTime);
         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
-          timeZone, needRounding, roundUnit, roundValue, useLocalTime);
+            timeZone, needRounding, roundUnit, roundValue, useLocalTime);
 
         String lookupPath = realPath + DIRECTORY_DELIMITER + realName;
         BucketWriter bucketWriter;
@@ -418,7 +406,7 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
           bucketWriter.append(event);
         } catch (BucketClosedException ex) {
           LOG.info("Bucket was closed while trying to append, " +
-            "reinitializing bucket and writing event.");
+                   "reinitializing bucket and writing event.");
           hdfsWriter = writerFactory.getWriter(fileType);
           bucketWriter = initializeBucketWriter(realPath, realName,
             lookupPath, hdfsWriter, closeCallback);
@@ -468,16 +456,16 @@ public class HDFSEventSink extends AbstractSink 
implements Configurable {
   }
 
   private BucketWriter initializeBucketWriter(String realPath,
-    String realName, String lookupPath, HDFSWriter hdfsWriter,
-    WriterCallback closeCallback) {
+      String realName, String lookupPath, HDFSWriter hdfsWriter,
+      WriterCallback closeCallback) {
     BucketWriter bucketWriter = new BucketWriter(rollInterval,
-      rollSize, rollCount,
-      batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
-      suffix, codeC, compType, hdfsWriter, timedRollerPool,
-      privExecutor, sinkCounter, idleTimeout, closeCallback,
-      lookupPath, callTimeout, callTimeoutPool, retryInterval,
-      tryCount);
-    if(mockFs != null) {
+        rollSize, rollCount,
+        batchSize, context, realPath, realName, inUsePrefix, inUseSuffix,
+        suffix, codeC, compType, hdfsWriter, timedRollerPool,
+        privExecutor, sinkCounter, idleTimeout, closeCallback,
+        lookupPath, callTimeout, callTimeoutPool, retryInterval,
+        tryCount);
+    if (mockFs != null) {
       bucketWriter.setFileSystem(mockFs);
       bucketWriter.setMockStream(mockWriter);
     }
@@ -504,7 +492,7 @@ public class HDFSEventSink extends AbstractSink implements 
Configurable {
     }
 
     // shut down all our thread pools
-    ExecutorService toShutdown[] = {callTimeoutPool, timedRollerPool};
+    ExecutorService[] toShutdown = { callTimeoutPool, timedRollerPool };
     for (ExecutorService execService : toShutdown) {
       execService.shutdown();
       try {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
index a261cce..ba8b30d 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java
@@ -81,16 +81,15 @@ public class HDFSSequenceFile extends AbstractHDFSWriter {
   protected void open(Path dstPath, CompressionCodec codeC,
       CompressionType compType, Configuration conf, FileSystem hdfs)
           throws IOException {
-    if(useRawLocalFileSystem) {
-      if(hdfs instanceof LocalFileSystem) {
+    if (useRawLocalFileSystem) {
+      if (hdfs instanceof LocalFileSystem) {
         hdfs = ((LocalFileSystem)hdfs).getRaw();
       } else {
         logger.warn("useRawLocalFileSystem is set to true but file system " +
             "is not of type LocalFileSystem: " + hdfs.getClass().getName());
       }
     }
-    if (conf.getBoolean("hdfs.append.support", false) == true && hdfs.isFile
-            (dstPath)) {
+    if (conf.getBoolean("hdfs.append.support", false) == true && 
hdfs.isFile(dstPath)) {
       outStream = hdfs.append(dstPath);
     } else {
       outStream = hdfs.create(dstPath);

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java
index 516988e..43297e2 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/KerberosUser.java
@@ -46,7 +46,9 @@ public class KerberosUser {
       return false;
     }
     final KerberosUser other = (KerberosUser) obj;
-    if ((this.principal == null) ? (other.principal != null) : 
!this.principal.equals(other.principal)) {
+    if ((this.principal == null) ?
+        (other.principal != null) :
+        !this.principal.equals(other.principal)) {
       return false;
     }
     if ((this.keyTab == null) ? (other.keyTab != null) : 
!this.keyTab.equals(other.keyTab)) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
index 4351488..2ad7689 100644
--- 
a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
+++ 
b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/SequenceFileSerializerType.java
@@ -25,8 +25,7 @@ public enum SequenceFileSerializerType {
 
   private final Class<? extends SequenceFileSerializer.Builder> builderClass;
 
-  SequenceFileSerializerType(
-    Class<? extends SequenceFileSerializer.Builder> builderClass) {
+  SequenceFileSerializerType(Class<? extends SequenceFileSerializer.Builder> 
builderClass) {
     this.builderClass = builderClass;
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
index 4d70aaa..59520e7 100644
--- 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
+++ 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveDelimitedTextSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flume.sink.hive;
 
-
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.hive.hcatalog.streaming.DelimitedInputWriter;
@@ -60,12 +59,11 @@ public class HiveDelimitedTextSerializer implements 
HiveEventSerializer  {
 
   @Override
   public RecordWriter createRecordWriter(HiveEndPoint endPoint)
-          throws StreamingException, IOException, ClassNotFoundException {
+      throws StreamingException, IOException, ClassNotFoundException {
     if (serdeSeparator == null) {
       return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint);
     }
-    return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint, 
null
-            , serdeSeparator);
+    return new DelimitedInputWriter(fieldToColMapping, delimiter, endPoint, 
null, serdeSeparator);
   }
 
   @Override
@@ -90,8 +88,8 @@ public class HiveDelimitedTextSerializer implements 
HiveEventSerializer  {
       return null;
     }
     if (delimiter.charAt(0) == '"'  &&
-         delimiter.charAt(delimiter.length()-1) == '"') {
-      return delimiter.substring(1,delimiter.length()-1);
+        delimiter.charAt(delimiter.length() - 1) == '"') {
+      return delimiter.substring(1,delimiter.length() - 1);
     }
     return delimiter;
   }
@@ -105,9 +103,9 @@ public class HiveDelimitedTextSerializer implements 
HiveEventSerializer  {
       return separatorStr.charAt(0);
     }
     if (separatorStr.length() == 3    &&
-          separatorStr.charAt(2) == '\''  &&
-          separatorStr.charAt(separatorStr.length()-1) == '\'') {
-      return  separatorStr.charAt(1);
+        separatorStr.charAt(2) == '\''  &&
+        separatorStr.charAt(separatorStr.length() - 1) == '\'') {
+      return separatorStr.charAt(1);
     }
 
     throw new IllegalArgumentException("serializer.serdeSeparator spec is 
invalid " +

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
index 386484c..7ed2c82 100644
--- 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
+++ 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveEventSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flume.sink.hive;
 
-
 import org.apache.flume.Event;
 import org.apache.flume.conf.Configurable;
 import org.apache.hive.hcatalog.streaming.HiveEndPoint;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
index d93bca3..cc5cdca 100644
--- 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
+++ 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveSink.java
@@ -32,7 +32,7 @@ import org.apache.flume.conf.Configurable;
 import org.apache.flume.formatter.output.BucketPath;
 import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
-import org.apache.hive.hcatalog.streaming.*;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 public class HiveSink extends AbstractSink implements Configurable {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(HiveSink.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HiveSink.class);
 
   private static final int DEFAULT_MAXOPENCONNECTIONS = 500;
   private static final int DEFAULT_TXNSPERBATCH = 100;
@@ -62,7 +61,6 @@ public class HiveSink extends AbstractSink implements 
Configurable {
   private static final int DEFAULT_IDLETIMEOUT = 0;
   private static final int DEFAULT_HEARTBEATINTERVAL = 240; // seconds
 
-
   private Map<HiveEndPoint, HiveWriter> allWriters;
 
   private SinkCounter sinkCounter;
@@ -162,7 +160,8 @@ public class HiveSink extends AbstractSink implements 
Configurable {
               + DEFAULT_HEARTBEATINTERVAL);
       heartBeatInterval = DEFAULT_HEARTBEATINTERVAL;
     }
-    maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS, 
DEFAULT_MAXOPENCONNECTIONS);
+    maxOpenConnections = context.getInteger(Config.MAX_OPEN_CONNECTIONS,
+                                            DEFAULT_MAXOPENCONNECTIONS);
     autoCreatePartitions =  context.getBoolean("autoCreatePartitions", true);
 
     // Timestamp processing
@@ -177,7 +176,7 @@ public class HiveSink extends AbstractSink implements 
Configurable {
       this.roundUnit = Calendar.HOUR_OF_DAY;
     } else if (unit.equalsIgnoreCase(Config.MINUTE)) {
       this.roundUnit = Calendar.MINUTE;
-    } else if (unit.equalsIgnoreCase(Config.SECOND)){
+    } else if (unit.equalsIgnoreCase(Config.SECOND)) {
       this.roundUnit = Calendar.SECOND;
     } else {
       LOG.warn(getName() + ". Rounding unit is not valid, please set one of " +
@@ -185,10 +184,10 @@ public class HiveSink extends AbstractSink implements 
Configurable {
       needRounding = false;
     }
     this.roundValue = context.getInteger(Config.ROUND_VALUE, 1);
-    if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE){
+    if (roundUnit == Calendar.SECOND || roundUnit == Calendar.MINUTE) {
       Preconditions.checkArgument(roundValue > 0 && roundValue <= 60,
               "Round value must be > 0 and <= 60");
-    } else if (roundUnit == Calendar.HOUR_OF_DAY){
+    } else if (roundUnit == Calendar.HOUR_OF_DAY) {
       Preconditions.checkArgument(roundValue > 0 && roundValue <= 24,
               "Round value must be > 0 and <= 24");
     }
@@ -215,8 +214,8 @@ public class HiveSink extends AbstractSink implements 
Configurable {
     return sinkCounter;
   }
   private HiveEventSerializer createSerializer(String serializerName)  {
-    if(serializerName.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) 
== 0 ||
-            
serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) == 0) {
+    if (serializerName.compareToIgnoreCase(HiveDelimitedTextSerializer.ALIAS) 
== 0 ||
+        serializerName.compareTo(HiveDelimitedTextSerializer.class.getName()) 
== 0) {
       return new HiveDelimitedTextSerializer();
     } else if (serializerName.compareToIgnoreCase(HiveJsonSerializer.ALIAS) == 
0 ||
             serializerName.compareTo(HiveJsonSerializer.class.getName()) == 0) 
{
@@ -345,7 +344,7 @@ public class HiveSink extends AbstractSink implements 
Configurable {
                 callTimeout, callTimeoutPool, proxyUser, serializer, 
sinkCounter);
 
         sinkCounter.incrementConnectionCreatedCount();
-        if (allWriters.size() > maxOpenConnections){
+        if (allWriters.size() > maxOpenConnections) {
           int retired = closeIdleWriters();
           if (retired == 0) {
             closeEldestWriter();
@@ -353,8 +352,7 @@ public class HiveSink extends AbstractSink implements 
Configurable {
         }
         allWriters.put(endPoint, writer);
         activeWriters.put(endPoint, writer);
-      }
-      else {
+      } else {
         if (activeWriters.get(endPoint) == null)  {
           activeWriters.put(endPoint,writer);
         }
@@ -425,7 +423,7 @@ public class HiveSink extends AbstractSink implements 
Configurable {
       }
     }
     //2) Retire them
-    for(HiveEndPoint ep : retirees) {
+    for (HiveEndPoint ep : retirees) {
       sinkCounter.incrementConnectionClosedCount();
       LOG.info(getName() + ": Closing idle Writer to Hive end point : {}", ep);
       allWriters.remove(ep).close();
@@ -440,7 +438,7 @@ public class HiveSink extends AbstractSink implements 
Configurable {
   private void closeAllWriters() throws InterruptedException {
     //1) Retire writers
     for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
-        entry.getValue().close();
+      entry.getValue().close();
     }
 
     //2) Clear cache
@@ -453,7 +451,7 @@ public class HiveSink extends AbstractSink implements 
Configurable {
    */
   private void abortAllWriters() throws InterruptedException {
     for (Entry<HiveEndPoint,HiveWriter> entry : allWriters.entrySet()) {
-        entry.getValue().abort();
+      entry.getValue().abort();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
index ec30c98..7106696 100644
--- 
a/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
+++ 
b/flume-ng-sinks/flume-hive-sink/src/main/java/org/apache/flume/sink/hive/HiveWriter.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,6 +18,18 @@
 
 package org.apache.flume.sink.hive;
 
+import org.apache.flume.Event;
+import org.apache.flume.instrumentation.SinkCounter;
+import org.apache.hive.hcatalog.streaming.HiveEndPoint;
+import org.apache.hive.hcatalog.streaming.RecordWriter;
+import org.apache.hive.hcatalog.streaming.SerializationError;
+import org.apache.hive.hcatalog.streaming.StreamingConnection;
+import org.apache.hive.hcatalog.streaming.StreamingException;
+import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
+import org.apache.hive.hcatalog.streaming.TransactionBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.concurrent.Callable;
@@ -27,24 +39,12 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import org.apache.hive.hcatalog.streaming.*;
-
-import org.apache.flume.Event;
-
-import org.apache.flume.instrumentation.SinkCounter;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
 /**
  * Internal API intended for HiveSink use.
  */
 class HiveWriter {
 
-  private static final Logger LOG = LoggerFactory
-      .getLogger(HiveWriter.class);
+  private static final Logger LOG = LoggerFactory.getLogger(HiveWriter.class);
 
   private final HiveEndPoint endPoint;
   private HiveEventSerializer serializer;
@@ -76,7 +76,7 @@ class HiveWriter {
              boolean autoCreatePartitions, long callTimeout,
              ExecutorService callTimeoutPool, String hiveUser,
              HiveEventSerializer serializer, SinkCounter sinkCounter)
-          throws ConnectException, InterruptedException {
+      throws ConnectException, InterruptedException {
     try {
       this.autoCreatePartitions = autoCreatePartitions;
       this.sinkCounter = sinkCounter;
@@ -130,13 +130,13 @@ class HiveWriter {
    * @throws InterruptedException
    */
   public synchronized void write(final Event event)
-          throws WriteException, InterruptedException {
+      throws WriteException, InterruptedException {
     if (closed) {
       throw new IllegalStateException("Writer closed. Cannot write to : " + 
endPoint);
     }
 
     batch.add(event);
-    if(batch.size()== writeBatchSz) {
+    if (batch.size() == writeBatchSz) {
       // write the event
       writeEventBatchToSerializer();
     }
@@ -147,7 +147,7 @@ class HiveWriter {
   }
 
   private void writeEventBatchToSerializer()
-          throws InterruptedException, WriteException {
+      throws InterruptedException, WriteException {
     try {
       timedCall(new CallRunner1<Void>() {
         @Override
@@ -180,15 +180,15 @@ class HiveWriter {
    *       new TxnBatch if current Txn batch is exhausted
    */
   public void flush(boolean rollToNext)
-          throws CommitException, TxnBatchException, TxnFailure, 
InterruptedException,
-          WriteException {
-    if(!batch.isEmpty()) {
+      throws CommitException, TxnBatchException, TxnFailure, 
InterruptedException,
+      WriteException {
+    if (!batch.isEmpty()) {
       writeEventBatchToSerializer();
       batch.clear();
     }
 
     //0 Heart beat on TxnBatch
-    if(hearbeatNeeded) {
+    if (hearbeatNeeded) {
       hearbeatNeeded = false;
       heartBeat();
     }
@@ -197,16 +197,16 @@ class HiveWriter {
     try {
       //1 commit txn & close batch if needed
       commitTxn();
-      if(txnBatch.remainingTransactions() == 0) {
+      if (txnBatch.remainingTransactions() == 0) {
         closeTxnBatch();
         txnBatch = null;
-        if(rollToNext) {
+        if (rollToNext) {
           txnBatch = nextTxnBatch(recordWriter);
         }
       }
 
       //2 roll to next Txn
-      if(rollToNext) {
+      if (rollToNext) {
         LOG.debug("Switching to next Txn for {}", endPoint);
         txnBatch.beginNextTransaction(); // does not block
       }
@@ -219,7 +219,7 @@ class HiveWriter {
    * Aborts the current Txn
    * @throws InterruptedException
    */
-  public void abort()  throws InterruptedException {
+  public void abort() throws InterruptedException {
     batch.clear();
     abortTxn();
   }
@@ -227,7 +227,7 @@ class HiveWriter {
   /** Queues up a heartbeat request on the current and remaining txns using the
    *  heartbeatThdPool and returns immediately
    */
-  public void heartBeat() throws InterruptedException  {
+  public void heartBeat() throws InterruptedException {
     // 1) schedule the heartbeat on one thread in pool
     try {
       timedCall(new CallRunner1<Void>() {
@@ -261,43 +261,43 @@ class HiveWriter {
 
 
   private void abortRemainingTxns() throws InterruptedException {
-      try {
-        if ( !isClosed(txnBatch.getCurrentTransactionState()) ) {
-          abortCurrTxnHelper();
-        }
+    try {
+      if (!isClosed(txnBatch.getCurrentTransactionState())) {
+        abortCurrTxnHelper();
+      }
 
-        // recursively abort remaining txns
-        if(txnBatch.remainingTransactions()>0) {
-          timedCall(
-                  new CallRunner1<Void>() {
-                    @Override
-                    public Void call() throws StreamingException, 
InterruptedException {
-                      txnBatch.beginNextTransaction();
-                      return null;
-                    }
-                  });
-          abortRemainingTxns();
-        }
-      } catch (StreamingException e) {
-        LOG.warn("Error when aborting remaining transactions in batch " + 
txnBatch, e);
-        return;
-      } catch (TimeoutException e) {
-        LOG.warn("Timed out when aborting remaining transactions in batch " + 
txnBatch, e);
-        return;
+      // recursively abort remaining txns
+      if (txnBatch.remainingTransactions() > 0) {
+        timedCall(
+            new CallRunner1<Void>() {
+              @Override
+              public Void call() throws StreamingException, 
InterruptedException {
+                txnBatch.beginNextTransaction();
+                return null;
+              }
+            });
+        abortRemainingTxns();
       }
+    } catch (StreamingException e) {
+      LOG.warn("Error when aborting remaining transactions in batch " + 
txnBatch, e);
+      return;
+    } catch (TimeoutException e) {
+      LOG.warn("Timed out when aborting remaining transactions in batch " + 
txnBatch, e);
+      return;
+    }
   }
 
   private void abortCurrTxnHelper() throws TimeoutException, 
InterruptedException {
     try {
       timedCall(
-              new CallRunner1<Void>() {
-                @Override
-                public Void call() throws StreamingException, 
InterruptedException {
-                  txnBatch.abort();
-                  LOG.info("Aborted txn " + txnBatch.getCurrentTxnId());
-                  return null;
-                }
-              }
+          new CallRunner1<Void>() {
+            @Override
+            public Void call() throws StreamingException, InterruptedException 
{
+              txnBatch.abort();
+              LOG.info("Aborted txn " + txnBatch.getCurrentTxnId());
+              return null;
+            }
+          }
       );
     } catch (StreamingException e) {
       LOG.warn("Unable to abort transaction " + txnBatch.getCurrentTxnId(), e);
@@ -306,10 +306,12 @@ class HiveWriter {
   }
 
   private boolean isClosed(TransactionBatch.TxnState txnState) {
-    if(txnState == TransactionBatch.TxnState.COMMITTED)
+    if (txnState == TransactionBatch.TxnState.COMMITTED) {
       return true;
-    if(txnState == TransactionBatch.TxnState.ABORTED)
+    }
+    if (txnState == TransactionBatch.TxnState.ABORTED) {
       return true;
+    }
     return false;
   }
 
@@ -360,7 +362,8 @@ class HiveWriter {
     } catch (InterruptedException e) {
       throw e;
     } catch (TimeoutException e) {
-      LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() + " 
on EndPoint: " + endPoint, e);
+      LOG.warn("Timeout while aborting Txn " + txnBatch.getCurrentTxnId() +
+               " on EndPoint: " + endPoint, e);
     } catch (Exception e) {
       LOG.warn("Error aborting Txn " + txnBatch.getCurrentTxnId() + " on 
EndPoint: " + endPoint, e);
       // Suppressing exceptions as we don't care for errors on abort
@@ -368,9 +371,9 @@ class HiveWriter {
   }
 
   private StreamingConnection newConnection(final String proxyUser)
-          throws InterruptedException, ConnectException {
+      throws InterruptedException, ConnectException {
     try {
-      return  timedCall(new CallRunner1<StreamingConnection>() {
+      return timedCall(new CallRunner1<StreamingConnection>() {
         @Override
         public StreamingConnection call() throws InterruptedException, 
StreamingException {
           return endPoint.newConnection(autoCreatePartitions); // could block
@@ -382,7 +385,7 @@ class HiveWriter {
   }
 
   private TransactionBatch nextTxnBatch(final RecordWriter recordWriter)
-          throws InterruptedException, TxnBatchException {
+      throws InterruptedException, TxnBatchException {
     LOG.debug("Fetching new Txn Batch for {}", endPoint);
     TransactionBatch batch = null;
     try {
@@ -418,7 +421,7 @@ class HiveWriter {
   }
 
   private <T> T timedCall(final CallRunner1<T> callRunner)
-          throws TimeoutException, InterruptedException, StreamingException {
+      throws TimeoutException, InterruptedException, StreamingException {
     Future<T> future = callTimeoutPool.submit(new Callable<T>() {
       @Override
       public T call() throws StreamingException, InterruptedException, Failure 
{
@@ -439,7 +442,7 @@ class HiveWriter {
     } catch (ExecutionException e1) {
       sinkCounter.incrementConnectionFailedCount();
       Throwable cause = e1.getCause();
-      if (cause instanceof IOException ) {
+      if (cause instanceof IOException) {
         throw new StreamingException("I/O Failure", (IOException) cause);
       } else if (cause instanceof StreamingException) {
         throw (StreamingException) cause;
@@ -468,12 +471,10 @@ class HiveWriter {
     T call() throws Exception;
   }
 
-
   private interface CallRunner1<T> {
     T call() throws StreamingException, InterruptedException, Failure;
   }
 
-
   public static class Failure extends Exception {
     public Failure(String msg, Throwable cause) {
       super(msg, cause);
@@ -488,7 +489,7 @@ class HiveWriter {
 
   public static class CommitException extends Failure {
     public CommitException(HiveEndPoint endPoint, Long txnID, Throwable cause) 
{
-      super("Commit of Txn " + txnID +  " failed on EndPoint: " + endPoint, 
cause);
+      super("Commit of Txn " + txnID + " failed on EndPoint: " + endPoint, 
cause);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
 
b/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
index 40657b4..52bbfc8 100644
--- 
a/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
+++ 
b/flume-ng-sinks/flume-irc-sink/src/main/java/org/apache/flume/sink/irc/IRCSink.java
@@ -60,7 +60,7 @@ public class IRCSink extends AbstractSink implements 
Configurable {
   
   private CounterGroup counterGroup;
 
-  static public class IRCConnectionListener implements IRCEventListener {
+  public static class IRCConnectionListener implements IRCEventListener {
 
     public void onRegistered() {
     }
@@ -214,7 +214,7 @@ public class IRCSink extends AbstractSink implements 
Configurable {
     
     if (splitLines) {
       String[] lines = body.split(splitChars);
-      for(String line: lines) {
+      for (String line: lines) {
         connection.doPrivmsg(IRC_CHANNEL_PREFIX + this.chan, line);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
index 9996142..754155c 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/AbstractElasticSearchIndexRequestBuilderFactory.java
@@ -49,8 +49,7 @@ public abstract class 
AbstractElasticSearchIndexRequestBuilderFactory
    * Constructor for subclasses
    * @param fastDateFormat {@link FastDateFormat} to use for index names
    */
-  protected AbstractElasticSearchIndexRequestBuilderFactory(
-    FastDateFormat fastDateFormat) {
+  protected AbstractElasticSearchIndexRequestBuilderFactory(FastDateFormat 
fastDateFormat) {
     this.fastDateFormat = fastDateFormat;
   }
 
@@ -94,11 +93,11 @@ public abstract class 
AbstractElasticSearchIndexRequestBuilderFactory
 
   /**
    * Gets the name of the index to use for an index request
-   * @return index name of the form 'indexPrefix-formattedTimestamp'
    * @param indexPrefix
    *          Prefix of index name to use -- as configured on the sink
    * @param timestamp
    *          timestamp (millis) to format / use
+   * @return index name of the form 'indexPrefix-formattedTimestamp'
    */
   protected String getIndexName(String indexPrefix, long timestamp) {
     return new StringBuilder(indexPrefix).append('-')

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
index 1ca227a..f76308c 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchIndexRequestBuilderFactory.java
@@ -40,7 +40,6 @@ public interface ElasticSearchIndexRequestBuilderFactory 
extends Configurable,
       TimeZone.getTimeZone("Etc/UTC"));
 
   /**
-   * @return prepared ElasticSearch {@link IndexRequestBuilder} instance
    * @param client
    *          ElasticSearch {@link Client} to prepare index from
    * @param indexPrefix
@@ -49,6 +48,7 @@ public interface ElasticSearchIndexRequestBuilderFactory 
extends Configurable,
    *          Index type to use -- as configured on the sink
    * @param event
    *          Flume event to serialize and add to index request
+   * @return prepared ElasticSearch {@link IndexRequestBuilder} instance
    * @throws IOException
    *           If an error occurs e.g. during serialization
    */

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
index 1d9dfce..ebafb9f 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java
@@ -54,6 +54,7 @@ import com.google.common.base.Throwables;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_PREFIX;
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.CLIENT_TYPE;
 import static 
org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.DEFAULT_CLIENT_TYPE;

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
index c71b2e5..d6cca50 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/EventSerializerIndexRequestBuilderFactory.java
@@ -33,7 +33,7 @@ import org.elasticsearch.common.io.BytesStream;
  * {@link ElasticSearchEventSerializer} instance configured on the sink.
  */
 public class EventSerializerIndexRequestBuilderFactory
-  extends AbstractElasticSearchIndexRequestBuilderFactory {
+    extends AbstractElasticSearchIndexRequestBuilderFactory {
 
   protected final ElasticSearchEventSerializer serializer;
 

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
index 873157a..cb34394 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchClientFactory.java
@@ -63,8 +63,10 @@ public class ElasticSearchClientFactory {
    *
    * @return Local elastic search instance client
    */
-  public ElasticSearchClient getLocalClient(String clientType, 
ElasticSearchEventSerializer serializer,
-          ElasticSearchIndexRequestBuilderFactory indexBuilder) throws 
NoSuchClientTypeException {
+  public ElasticSearchClient getLocalClient(String clientType,
+                                            ElasticSearchEventSerializer 
serializer,
+                                            
ElasticSearchIndexRequestBuilderFactory indexBuilder)
+      throws NoSuchClientTypeException {
     if (clientType.equalsIgnoreCase(TransportClient) && serializer != null) {
       return new ElasticSearchTransportClient(serializer);
     } else if (clientType.equalsIgnoreCase(TransportClient) && indexBuilder != 
null)  {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
index 0d1c37f..e51efe2 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchRestClient.java
@@ -25,12 +25,6 @@ import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.sink.elasticsearch.ElasticSearchEventSerializer;
 import org.apache.flume.sink.elasticsearch.IndexNameBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.http.HttpResponse;
 import org.apache.http.HttpStatus;
 import org.apache.http.client.HttpClient;
@@ -39,6 +33,12 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.http.util.EntityUtils;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Rest ElasticSearch client which is responsible for sending bulks of events 
to
@@ -92,7 +92,8 @@ public class  ElasticSearchRestClient implements 
ElasticSearchClient {
   }
 
   @Override
-  public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String 
indexType, long ttlMs) throws Exception {
+  public void addEvent(Event event, IndexNameBuilder indexNameBuilder, String 
indexType,
+                       long ttlMs) throws Exception {
     BytesReference content = serializer.getContentBuilder(event).bytes();
     Map<String, Map<String, String>> parameters = new HashMap<String, 
Map<String, String>>();
     Map<String, String> indexParameters = new HashMap<String, String>();
@@ -104,7 +105,7 @@ public class  ElasticSearchRestClient implements 
ElasticSearchClient {
     parameters.put(INDEX_OPERATION_NAME, indexParameters);
 
     Gson gson = new Gson();
-    synchronized(bulkBuilder) {
+    synchronized (bulkBuilder) {
       bulkBuilder.append(gson.toJson(parameters));
       bulkBuilder.append("\n");
       bulkBuilder.append(content.toBytesArray().toUtf8());
@@ -131,8 +132,10 @@ public class  ElasticSearchRestClient implements 
ElasticSearchClient {
       response = httpClient.execute(httpRequest);
       statusCode = response.getStatusLine().getStatusCode();
       logger.info("Status code from elasticsearch: " + statusCode);
-      if (response.getEntity() != null)
-        logger.debug("Status message from elasticsearch: " + 
EntityUtils.toString(response.getEntity(), "UTF-8"));
+      if (response.getEntity() != null) {
+        logger.debug("Status message from elasticsearch: " +
+                     EntityUtils.toString(response.getEntity(), "UTF-8"));
+      }
     }
 
     if (statusCode != HttpStatus.SC_OK) {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
index d44c8ad..2cf365e 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/ElasticSearchTransportClient.java
@@ -122,12 +122,10 @@ public class ElasticSearchTransportClient implements 
ElasticSearchClient {
 
   /**
    * Used for testing
-   *
-   * @param client ElasticSearch Client
-   * @param serializer Event Serializer
    */
   public ElasticSearchTransportClient(Client client,
-            ElasticSearchIndexRequestBuilderFactory requestBuilderFactory) 
throws IOException {
+                                      ElasticSearchIndexRequestBuilderFactory 
requestBuilderFactory)
+      throws IOException {
     this.client = client;
     requestBuilderFactory.createIndexRequest(client, null, null, null);
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
index dbad8d8..4cbbe91 100644
--- 
a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
+++ 
b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/client/RoundRobinList.java
@@ -29,7 +29,7 @@ public class RoundRobinList<T> {
     iterator = this.elements.iterator();
   }
 
-  synchronized public T get() {
+  public synchronized T get() {
     if (iterator.hasNext()) {
       return iterator.next();
     } else {

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
index 28f0de1..280d0b0 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHBaseSink.java
@@ -18,27 +18,23 @@
  */
 package org.apache.flume.sink.hbase;
 
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.primitives.UnsignedBytes;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.stumbleupon.async.Callback;
 import org.apache.flume.Channel;
+import org.apache.flume.ChannelException;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
 import org.apache.flume.Transaction;
 import org.apache.flume.conf.Configurable;
+import org.apache.flume.instrumentation.SinkCounter;
 import org.apache.flume.sink.AbstractSink;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -47,68 +43,68 @@ import org.apache.hadoop.hbase.zookeeper.ZKConfig;
 import org.hbase.async.AtomicIncrementRequest;
 import org.hbase.async.HBaseClient;
 import org.hbase.async.PutRequest;
-import org.jboss.netty.channel.socket.nio
-  .NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Charsets;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.stumbleupon.async.Callback;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import org.apache.flume.ChannelException;
-import org.apache.flume.instrumentation.SinkCounter;
 
 /**
-*
-* A simple sink which reads events from a channel and writes them to HBase.
-* This Sink uses an aysnchronous API internally and is likely to
-* perform better.
-* The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
-* encountered in the classpath. This sink supports batch reading of
-* events from the channel, and writing them to Hbase, to minimize the number
-* of flushes on the hbase tables. To use this sink, it has to be configured
-* with certain mandatory parameters:<p>
-*
-* <tt>table: </tt> The name of the table in Hbase to write to. <p>
-* <tt>columnFamily: </tt> The column family in Hbase to write to.<p>
-* Other optional parameters are:<p>
-* <tt>serializer:</tt> A class implementing
-*  {@link AsyncHbaseEventSerializer}.
-*  An instance of
-* this class will be used to serialize events which are written to hbase.<p>
-* <tt>serializer.*:</tt> Passed in the <code>configure()</code> method to
-* serializer
-* as an object of {@link org.apache.flume.Context}.<p>
-* <tt>batchSize: </tt>This is the batch size used by the client. This is the
-* maximum number of events the sink will commit per transaction. The default
-* batch size is 100 events.
-* <p>
-* <tt>timeout: </tt> The length of time in milliseconds the sink waits for
-* callbacks from hbase for all events in a transaction.
-* If no timeout is specified, the sink will wait forever.<p>
-*
-* <strong>Note: </strong> Hbase does not guarantee atomic commits on multiple
-* rows. So if a subset of events in a batch are written to disk by Hbase and
-* Hbase fails, the flume transaction is rolled back, causing flume to write
-* all the events in the transaction all over again, which will cause
-* duplicates. The serializer is expected to take care of the handling of
-* duplicates etc. HBase also does not support batch increments, so if
-* multiple increments are returned by the serializer, then HBase failure
-* will cause them to be re-written, when HBase comes back up.
-*/
+ * A simple sink which reads events from a channel and writes them to HBase.
+ * This Sink uses an aysnchronous API internally and is likely to
+ * perform better.
+ * The Hbase configution is picked up from the first <tt>hbase-site.xml</tt>
+ * encountered in the classpath. This sink supports batch reading of
+ * events from the channel, and writing them to Hbase, to minimize the number
+ * of flushes on the hbase tables. To use this sink, it has to be configured
+ * with certain mandatory parameters:<p>
+ * <p>
+ * <tt>table: </tt> The name of the table in Hbase to write to. <p>
+ * <tt>columnFamily: </tt> The column family in Hbase to write to.<p>
+ * Other optional parameters are:<p>
+ * <tt>serializer:</tt> A class implementing
+ * {@link AsyncHbaseEventSerializer}.
+ * An instance of
+ * this class will be used to serialize events which are written to hbase.<p>
+ * <tt>serializer.*:</tt> Passed in the <code>configure()</code> method to
+ * serializer
+ * as an object of {@link org.apache.flume.Context}.<p>
+ * <tt>batchSize: </tt>This is the batch size used by the client. This is the
+ * maximum number of events the sink will commit per transaction. The default
+ * batch size is 100 events.
+ * <p>
+ * <tt>timeout: </tt> The length of time in milliseconds the sink waits for
+ * callbacks from hbase for all events in a transaction.
+ * If no timeout is specified, the sink will wait forever.<p>
+ * <p>
+ * <strong>Note: </strong> Hbase does not guarantee atomic commits on multiple
+ * rows. So if a subset of events in a batch are written to disk by Hbase and
+ * Hbase fails, the flume transaction is rolled back, causing flume to write
+ * all the events in the transaction all over again, which will cause
+ * duplicates. The serializer is expected to take care of the handling of
+ * duplicates etc. HBase also does not support batch increments, so if
+ * multiple increments are returned by the serializer, then HBase failure
+ * will cause them to be re-written, when HBase comes back up.
+ */
 public class AsyncHBaseSink extends AbstractSink implements Configurable {
 
   private String tableName;
   private byte[] columnFamily;
   private long batchSize;
-  private static final Logger logger =
-          LoggerFactory.getLogger(AsyncHBaseSink.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(AsyncHBaseSink.class);
   private AsyncHbaseEventSerializer serializer;
   private String eventSerializerType;
   private Context serializerContext;
@@ -138,10 +134,9 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
 
   // Does not need to be thread-safe. Always called only from the sink's
   // process method.
-  private final Comparator<byte[]> COMPARATOR = UnsignedBytes
-    .lexicographicalComparator();
+  private final Comparator<byte[]> COMPARATOR = 
UnsignedBytes.lexicographicalComparator();
 
-  public AsyncHBaseSink(){
+  public AsyncHBaseSink() {
     this(null);
   }
 
@@ -151,7 +146,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
 
   @VisibleForTesting
   AsyncHBaseSink(Configuration conf, boolean isTimeoutTest,
-    boolean isCoalesceTest) {
+                 boolean isCoalesceTest) {
     this.conf = conf;
     this.isTimeoutTest = isTimeoutTest;
     this.isCoalesceTest = isCoalesceTest;
@@ -189,17 +184,17 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
      * locks and conditions.
      */
     Callback<Object, Object> putSuccessCallback =
-            new SuccessCallback<Object, Object>(
+        new SuccessCallback<Object, Object>(
             lock, callbacksReceived, condition);
     Callback<Object, Exception> putFailureCallback =
-            new FailureCallback<Object, Exception>(
+        new FailureCallback<Object, Exception>(
             lock, callbacksReceived, txnFail, condition);
 
     Callback<Long, Long> incrementSuccessCallback =
-            new SuccessCallback<Long, Long>(
+        new SuccessCallback<Long, Long>(
             lock, callbacksReceived, condition);
     Callback<Long, Exception> incrementFailureCallback =
-            new FailureCallback<Long, Exception>(
+        new FailureCallback<Long, Exception>(
             lock, callbacksReceived, txnFail, condition);
 
     Status status = Status.READY;
@@ -235,9 +230,9 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
           for (AtomicIncrementRequest increment : increments) {
             if (batchIncrements) {
               CellIdentifier identifier = new CellIdentifier(increment.key(),
-                increment.qualifier());
+                  increment.qualifier());
               AtomicIncrementRequest request
-                = incrementBuffer.get(identifier);
+                  = incrementBuffer.get(identifier);
               if (request == null) {
                 incrementBuffer.put(identifier, increment);
               } else {
@@ -245,7 +240,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
               }
             } else {
               client.atomicIncrement(increment).addCallbacks(
-                incrementSuccessCallback, incrementFailureCallback);
+                  incrementSuccessCallback, incrementFailureCallback);
             }
           }
         }
@@ -254,7 +249,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
         Collection<AtomicIncrementRequest> increments = 
incrementBuffer.values();
         for (AtomicIncrementRequest increment : increments) {
           client.atomicIncrement(increment).addCallbacks(
-            incrementSuccessCallback, incrementFailureCallback);
+              incrementSuccessCallback, incrementFailureCallback);
         }
         callbacksExpected.addAndGet(increments.size());
       }
@@ -273,14 +268,14 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
     long timeRemaining;
     try {
       while ((callbacksReceived.get() < callbacksExpected.get())
-              && !txnFail.get()) {
+          && !txnFail.get()) {
         timeRemaining = timeout - (System.nanoTime() - startTime);
         timeRemaining = (timeRemaining >= 0) ? timeRemaining : 0;
         try {
           if (!condition.await(timeRemaining, TimeUnit.NANOSECONDS)) {
             txnFail.set(true);
             logger.warn("HBase callbacks timed out. "
-                    + "Transaction will be rolled back.");
+                + "Transaction will be rolled back.");
           }
         } catch (Exception ex) {
           logger.error("Exception while waiting for callbacks from HBase.");
@@ -348,35 +343,35 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
     Preconditions.checkNotNull(cf,
         "Column family cannot be empty, please specify in configuration file");
     //Check foe event serializer, if null set event serializer type
-    if(eventSerializerType == null || eventSerializerType.isEmpty()) {
+    if (eventSerializerType == null || eventSerializerType.isEmpty()) {
       eventSerializerType =
           "org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer";
       logger.info("No serializer defined, Will use default");
     }
     serializerContext.putAll(context.getSubProperties(
-            HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
+        HBaseSinkConfigurationConstants.CONFIG_SERIALIZER_PREFIX));
     columnFamily = cf.getBytes(Charsets.UTF_8);
     try {
       @SuppressWarnings("unchecked")
       Class<? extends AsyncHbaseEventSerializer> clazz =
-      (Class<? extends AsyncHbaseEventSerializer>)
-      Class.forName(eventSerializerType);
+          (Class<? extends AsyncHbaseEventSerializer>)
+              Class.forName(eventSerializerType);
       serializer = clazz.newInstance();
       serializer.configure(serializerContext);
       serializer.initialize(tableName.getBytes(Charsets.UTF_8), columnFamily);
     } catch (Exception e) {
-      logger.error("Could not instantiate event serializer." , e);
+      logger.error("Could not instantiate event serializer.", e);
       Throwables.propagate(e);
     }
 
-    if(sinkCounter == null) {
+    if (sinkCounter == null) {
       sinkCounter = new SinkCounter(this.getName());
     }
     timeout = context.getLong(HBaseSinkConfigurationConstants.CONFIG_TIMEOUT,
-            HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT);
-    if(timeout <= 0){
+        HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT);
+    if (timeout <= 0) {
       logger.warn("Timeout should be positive for Hbase sink. "
-              + "Sink will not timeout.");
+          + "Sink will not timeout.");
       timeout = HBaseSinkConfigurationConstants.DEFAULT_TIMEOUT;
     }
     //Convert to nanos.
@@ -384,7 +379,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
 
     zkQuorum = context.getString(
         HBaseSinkConfigurationConstants.ZK_QUORUM, "").trim();
-    if(!zkQuorum.isEmpty()) {
+    if (!zkQuorum.isEmpty()) {
       zkBaseDir = context.getString(
           HBaseSinkConfigurationConstants.ZK_ZNODE_PARENT,
           HBaseSinkConfigurationConstants.DEFAULT_ZK_ZNODE_PARENT);
@@ -394,32 +389,33 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
       }
       zkQuorum = ZKConfig.getZKQuorumServersString(conf);
       zkBaseDir = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
-        HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+          HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
     }
     Preconditions.checkState(zkQuorum != null && !zkQuorum.isEmpty(),
         "The Zookeeper quorum cannot be null and should be specified.");
 
     enableWal = context.getBoolean(HBaseSinkConfigurationConstants
-      .CONFIG_ENABLE_WAL, HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
+        .CONFIG_ENABLE_WAL, 
HBaseSinkConfigurationConstants.DEFAULT_ENABLE_WAL);
     logger.info("The write to WAL option is set to: " + 
String.valueOf(enableWal));
-    if(!enableWal) {
+    if (!enableWal) {
       logger.warn("AsyncHBaseSink's enableWal configuration is set to false. " 
+
-        "All writes to HBase will have WAL disabled, and any data in the " +
-        "memstore of this region in the Region Server could be lost!");
+          "All writes to HBase will have WAL disabled, and any data in the " +
+          "memstore of this region in the Region Server could be lost!");
     }
 
     batchIncrements = context.getBoolean(
-      HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
-      HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
+        HBaseSinkConfigurationConstants.CONFIG_COALESCE_INCREMENTS,
+        HBaseSinkConfigurationConstants.DEFAULT_COALESCE_INCREMENTS);
 
-    if(batchIncrements) {
+    if (batchIncrements) {
       incrementBuffer = Maps.newHashMap();
       logger.info("Increment coalescing is enabled. Increments will be " +
-        "buffered.");
+          "buffered.");
     }
 
-    maxConsecutiveFails = 
context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS,
-            HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS);
+    maxConsecutiveFails =
+        
context.getInteger(HBaseSinkConfigurationConstants.CONFIG_MAX_CONSECUTIVE_FAILS,
+                           
HBaseSinkConfigurationConstants.DEFAULT_MAX_CONSECUTIVE_FAILS);
 
   }
 
@@ -432,10 +428,11 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
   boolean isConfNull() {
     return conf == null;
   }
+
   @Override
-  public void start(){
+  public void start() {
     Preconditions.checkArgument(client == null, "Please call stop "
-            + "before calling start on an old instance.");
+        + "before calling start on an old instance.");
     sinkCounter.start();
     sinkCounter.incrementConnectionCreatedCount();
     client = initHBaseClient();
@@ -446,31 +443,31 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
     logger.info("Initializing HBase Client");
 
     sinkCallbackPool = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
-            .setNameFormat(this.getName() + " HBase Call Pool").build());
+        .setNameFormat(this.getName() + " HBase Call Pool").build());
     logger.info("Callback pool created");
     client = new HBaseClient(zkQuorum, zkBaseDir,
-            new NioClientSocketChannelFactory(sinkCallbackPool, 
sinkCallbackPool));
+        new NioClientSocketChannelFactory(sinkCallbackPool, sinkCallbackPool));
 
     final CountDownLatch latch = new CountDownLatch(1);
     final AtomicBoolean fail = new AtomicBoolean(false);
     client.ensureTableFamilyExists(
-            tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks(
-            new Callback<Object, Object>() {
-              @Override
-              public Object call(Object arg) throws Exception {
-                latch.countDown();
-                logger.info("table found");
-                return null;
-              }
-            },
-            new Callback<Object, Object>() {
-              @Override
-              public Object call(Object arg) throws Exception {
-                fail.set(true);
-                latch.countDown();
-                return null;
-              }
-            });
+        tableName.getBytes(Charsets.UTF_8), columnFamily).addCallbacks(
+          new Callback<Object, Object>() {
+            @Override
+            public Object call(Object arg) throws Exception {
+              latch.countDown();
+              logger.info("table found");
+              return null;
+            }
+          },
+          new Callback<Object, Object>() {
+            @Override
+            public Object call(Object arg) throws Exception {
+              fail.set(true);
+              latch.countDown();
+              return null;
+            }
+          });
 
     try {
       logger.info("waiting on callback");
@@ -481,14 +478,14 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
       throw new FlumeException(
           "Interrupted while waiting for Hbase Callbacks", e);
     }
-    if(fail.get()){
+    if (fail.get()) {
       sinkCounter.incrementConnectionFailedCount();
       if (client != null) {
         shutdownHBaseClient();
       }
       throw new FlumeException(
           "Could not start sink. " +
-          "Table or column family does not exist in Hbase.");
+              "Table or column family does not exist in Hbase.");
     } else {
       open = true;
     }
@@ -497,7 +494,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
   }
 
   @Override
-  public void stop(){
+  public void stop() {
     serializer.cleanUp();
     if (client != null) {
       shutdownHBaseClient();
@@ -514,7 +511,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
       }
     } catch (InterruptedException e) {
       logger.error("Interrupted while waiting for asynchbase sink pool to " +
-        "die", e);
+          "die", e);
       if (sinkCallbackPool != null) {
         sinkCallbackPool.shutdownNow();
       }
@@ -546,7 +543,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
       });
       if (!waiter.await(timeout, TimeUnit.NANOSECONDS)) {
         logger.error("HBase connection could not be closed within timeout! 
HBase cluster might " +
-          "be down!");
+            "be down!");
       }
     } catch (Exception ex) {
       logger.warn("Error while attempting to close connections to HBase");
@@ -569,7 +566,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
     } catch (Throwable e) {
       logger.error("Failed to commit transaction." +
           "Transaction rolled back.", e);
-      if(e instanceof Error || e instanceof RuntimeException){
+      if (e instanceof Error || e instanceof RuntimeException) {
         logger.error("Failed to commit transaction." +
             "Transaction rolled back.", e);
         Throwables.propagate(e);
@@ -583,14 +580,15 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
       txn.close();
     }
   }
-  private class SuccessCallback<R,T> implements Callback<R,T> {
+
+  private class SuccessCallback<R, T> implements Callback<R, T> {
     private Lock lock;
     private AtomicInteger callbacksReceived;
     private Condition condition;
     private final boolean isTimeoutTesting;
 
     public SuccessCallback(Lock lck, AtomicInteger callbacksReceived,
-            Condition condition) {
+                           Condition condition) {
       lock = lck;
       this.callbacksReceived = callbacksReceived;
       this.condition = condition;
@@ -614,7 +612,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
     private void doCall() throws Exception {
       callbacksReceived.incrementAndGet();
       lock.lock();
-      try{
+      try {
         condition.signal();
       } finally {
         lock.unlock();
@@ -622,14 +620,15 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
     }
   }
 
-  private class FailureCallback<R,T extends Exception> implements 
Callback<R,T> {
+  private class FailureCallback<R, T extends Exception> implements Callback<R, 
T> {
     private Lock lock;
     private AtomicInteger callbacksReceived;
     private AtomicBoolean txnFail;
     private Condition condition;
     private final boolean isTimeoutTesting;
+
     public FailureCallback(Lock lck, AtomicInteger callbacksReceived,
-            AtomicBoolean txnFail, Condition condition){
+                           AtomicBoolean txnFail, Condition condition) {
       this.lock = lck;
       this.callbacksReceived = callbacksReceived;
       this.txnFail = txnFail;
@@ -665,7 +664,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
   }
 
   private void checkIfChannelExceptionAndThrow(Throwable e)
-          throws EventDeliveryException {
+      throws EventDeliveryException {
     if (e instanceof ChannelException) {
       throw new EventDeliveryException("Error in processing transaction.", e);
     } else if (e instanceof Error || e instanceof RuntimeException) {
@@ -678,13 +677,14 @@ public class AsyncHBaseSink extends AbstractSink 
implements Configurable {
     private final byte[] row;
     private final byte[] column;
     private final int hashCode;
+
     // Since the sink operates only on one table and one cf,
     // we use the data from the owning sink
     public CellIdentifier(byte[] row, byte[] column) {
       this.row = row;
       this.column = column;
       this.hashCode =
-        (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31);
+          (Arrays.hashCode(row) * 31) * (Arrays.hashCode(column) * 31);
     }
 
     @Override
@@ -701,7 +701,7 @@ public class AsyncHBaseSink extends AbstractSink implements 
Configurable {
         return false;
       } else {
         return (COMPARATOR.compare(row, o.row) == 0
-          && COMPARATOR.compare(column, o.column) == 0);
+            && COMPARATOR.compare(column, o.column) == 0);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flume/blob/2252fb19/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java
 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java
index 9ae6c28..481fce8 100644
--- 
a/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java
+++ 
b/flume-ng-sinks/flume-ng-hbase-sink/src/main/java/org/apache/flume/sink/hbase/AsyncHbaseEventSerializer.java
@@ -34,8 +34,7 @@ import org.hbase.async.PutRequest;
  * of this interface is expected by the {@linkplain AsyncHBaseSink} to 
serialize
  * the events.
  */
-public interface AsyncHbaseEventSerializer extends Configurable,
-ConfigurableComponent {
+public interface AsyncHbaseEventSerializer extends Configurable, 
ConfigurableComponent {
 
   /**
    * Initialize the event serializer.
@@ -47,7 +46,7 @@ ConfigurableComponent {
   public void initialize(byte[] table, byte[] cf);
 
   /**
-   * @param Event to be written to HBase.
+   * @param event Event to be written to HBase
    */
   public void setEvent(Event event);
 

Reply via email to